"""
.. py:module:: environment
This module holds ``Enviroment``-class, an universe where the agents live.
Environment holds methods for inter-agent communication and some utilities that
are usually needed when implementing creative multi-agent systems.
All implementations should subclass ``Environment`` in order to provide basic
functionality for the system to operate.
Environments are used by defining their address at the instantation time, and
adding agents to their container.
"""
import asyncio
import logging
from random import choice, shuffle
from aiomas import Container
from creamas.logging import ObjectLogger
from creamas.util import run_or_coro
from creamas.serializers import get_serializers
__all__ = ['Environment']
[docs]class Environment(Container):
"""Base environment class inherited from :py:class:`aiomas.Container`.
"""
[docs] @classmethod
def create(cls, *args, **kwargs):
"""Create a new environment, see :py:func:`~aiomas.Container.create` for details.
If ``extra_serializers`` keyword argument is None, serializers returned by
:func:`~creamas.serializers.get_serializers` are added to it.
"""
extra_ser = kwargs.pop('extra_serializers', None)
if extra_ser is None:
extra_ser = get_serializers()
kwargs.update({'extra_serializers': extra_ser})
return super().create(*args, **kwargs)
def __init__(self, base_url, loop, clock, connect_kwargs):
super().__init__(base_url, loop, clock, connect_kwargs)
self._age = 0
self._logger = None
self._log_folder = None
self._artifacts = []
self._candidates = []
self._name = base_url
# Try setting the process name to easily recognize the spawned
# environments with 'ps -x' or 'top'
try:
import setproctitle as spt
spt.setproctitle('Creamas: {}'.format(str(self)))
except:
pass
@property
def name(self):
"""Name of the environment."""
return self._name
@property
def artifacts(self):
"""Published artifacts for all agents."""
return self._artifacts
@property
def age(self):
"""Age of the environment.
"""
return self._age
@age.setter
def age(self, a):
self._age = a
@property
def logger(self):
"""Logger for the environment.
"""
return self._logger
@property
def log_folder(self):
"""Logging folder for the environment. If set, will create py:class:`creamas.logging.ObjectLogger` for that
folder.
"""
return self._log_folder
@log_folder.setter
def log_folder(self, _log_folder):
assert(type(_log_folder) is str)
self._log_folder = _log_folder
self._logger = ObjectLogger(self, _log_folder, add_name=True,
init=True)
[docs] def get_agents(self, addr=True, agent_cls=None, include_manager=False):
"""Get agents in the environment.
:param bool addr: If ``True``, returns only addresses of the agents.
:param agent_cls: Optional, if specified returns only agents belonging to that particular class.
:param bool include_manager:
If ``True`` includes the environment's manager, i.e. the agent in the address
``tcp://environment-host:port/0``, to the returned list if the environment has attribute :attr:`manager`.
If environment does not have :attr:`manager`, then the parameter does nothing.
:returns: A list of agents in the environment.
:rtype: list
.. note::
Manager agents are excluded from the returned lists of agents by default.
"""
agents = list(self.agents.dict.values())
if hasattr(self, 'manager') and self.manager is not None:
if not include_manager:
agents = [a for a in agents if a.addr.rsplit('/', 1)[1] != '0']
if agent_cls is not None:
agents = [a for a in agents if type(a) is agent_cls]
if addr:
agents = [agent.addr for agent in agents]
return agents
[docs] def get_agent(self, addr):
"""Get agent in given address.
:raises ValueError: If given address is not part of this environment
:raises KeyError: If no such agent in the environment
"""
base_url, agent_number = addr.rsplit('/', 1)
base_url += "/"
if base_url != self._base_url:
raise ValueError("Given address' base URL ({}) does not match with the environment ({})."
.format(base_url, self._base_url))
agent = self.agents.dict[agent_number]
return agent
[docs] async def trigger_act(self, *args, addr=None, agent=None, **kwargs):
"""Trigger agent to act.
If *agent* is None, then looks the agent by the address.
:raises ValueError: if both *agent* and *addr* are None.
"""
if agent is None and addr is None:
raise TypeError("Either addr or agent has to be defined.")
if agent is None:
agent = self.get_agent(addr)
self._log(logging.DEBUG, "Triggering agent in {}".format(agent.addr))
ret = await agent.act(*args, **kwargs)
return ret
[docs] async def trigger_all(self, *args, **kwargs):
"""Trigger all agents in the environment to act asynchronously.
:returns: A list of agents' :meth:`act` return values.
Given arguments and keyword arguments are passed down to each agent's
:meth:`creamas.core.agent.CreativeAgent.act`.
.. note::
The environment's manager agent, i.e. if the environment has :attr:`manager`, is excluded from acting.
"""
tasks = []
for a in self.get_agents(addr=False, include_manager=False):
task = asyncio.ensure_future(self.trigger_act
(*args, agent=a, **kwargs))
tasks.append(task)
rets = await asyncio.gather(*tasks)
return rets
[docs] def is_ready(self):
"""Check if the environment is fully initialized.
The function is mainly used by the multiprocessing environment managers
and distributed environments to ensure that the environment has been
correctly initialized before any other preparations are done for the
environments or the simulation is started.
Override the function in the subclasses which need more time consuming
initialization routines. The function should return True when the
environment is ready be used in a simulation, or when any
cross-environment initialization routines can be run. That is, the
environment is inherently in a coherent state, and can execute orders
from managers or simulations.
:rtype: bool
:returns: This basic implementation returns always True.
"""
return True
[docs] def create_random_connections(self, n=5):
"""Create random connections for all agents in the environment.
:param int n: the number of connections for each agent
Existing agent connections that would be created by chance are not
doubled in the agent's :attr:`connections`, but count towards
connections created.
"""
if type(n) != int:
raise TypeError("Argument 'n' must be of type int.")
if n <= 0:
raise ValueError("Argument 'n' must be greater than zero.")
for a in self.get_agents(addr=False):
others = self.get_agents(addr=False)[:]
others.remove(a)
shuffle(others)
for r_agent in others[:n]:
a.add_connection(r_agent)
[docs] def create_connections(self, connection_map):
"""Create agent connections from a given connection map.
:param dict connection_map:
A map of connections to be created. Dictionary where keys are agent addresses and values are lists of
(addr, attitude)-tuples suitable for :meth:`~creamas.core.agent.CreativeAgent.add_connections`.
Only connections for agents in this environment are made.
"""
agents = self.get_agents(addr=False)
rets = []
for a in agents:
if a.addr in connection_map:
r = a.add_connections(connection_map[a.addr])
rets.append(r)
return rets
[docs] def get_connections(self, data=True):
"""Return connections from all the agents in the environment.
:param bool data: If ``True`` return also the dictionary associated with each connection
:returns:
A list of ``(addr, connections)``-tuples, where ``connections`` is a list of addresses agent in ``addr`` is
connected to. If ``data`` parameter is ``True``, then the ``connections`` list contains tuples of
``(nb_addr, data)``-pairs , where ``data`` is a dictionary.
:rtype: dict
.. note::
Environment's manager agent is excluded from the returned list.
"""
connections = []
for a in self.get_agents(addr=False):
c = (a.addr, a.get_connections(data=data))
connections.append(c)
return connections
[docs] def clear_connections(self):
"""Clear all connections from the agents in the environment.
"""
for a in self.get_agents(addr=False):
a.clear_connections()
[docs] def get_random_agent(self, agent):
"""Return random agent that is not the same as the agent given as a parameter.
:param agent: Agent that is not wanted to return
:type agent: :py:class:`~creamas.core.agent.CreativeAgent`
:returns: random, non-connected, agent from the environment
:rtype: :py:class:`~creamas.core.agent.CreativeAgent`
"""
r_agent = choice(self.get_agents(addr=False))
while r_agent.addr == agent.addr:
r_agent = choice(self.get_agents(addr=False))
return r_agent
[docs] def add_artifact(self, artifact):
"""Add artifact with given framing to the environment.
:param object artifact: Artifact to be added.
"""
artifact.env_time = self.age
self.artifacts.append(artifact)
self._log(logging.DEBUG, "ARTIFACTS appended: '{}', length={}"
.format(artifact, len(self.artifacts)))
[docs] def add_artifacts(self, artifacts):
"""Add artifacts to :attr:`artifacts`.
:param artifacts: list of :py:class:`~creamas.core.artifact.Artifact` objects
"""
for artifact in artifacts:
self.add_artifact(artifact)
[docs] async def get_artifacts(self, agent=None):
"""Return artifacts published to the environment.
:param agent: If not ``None``, then returns only artifacts created by the agent.
:returns: All artifacts published (by the agent).
:rtype: list
If environment has a :attr:`manager` agent, e.g. it is a slave environment in
:class:`~creamas.mp.MultiEnvironment`, then the manager's :meth:`~creamas.mp.EnvManager.get_artifacts` is called.
"""
# TODO: Figure better way for this
if hasattr(self, 'manager') and self.manager is not None:
artifacts = await self.manager.get_artifacts()
else:
artifacts = self.artifacts
if agent is not None:
artifacts = [a for a in artifacts if agent.name == a.creator]
return artifacts
def _log(self, level, msg):
if self.logger is not None:
self.logger.log(level, msg)
[docs] def save_info(self, folder, *args, **kwargs):
"""Save information accumulated during the environments lifetime.
Called from :py:meth:`~creamas.core.Environment.destroy`. Override in subclass.
:param str folder: root folder to save information
"""
pass
[docs] def destroy(self, folder=None, as_coro=False):
"""Close the environment.
.. deprecated:: 0.4.0
Use :func:`close` instead
"""
DeprecationWarning("{0}.destroy is deprecated, use {0}.close instead.".format(str(self.__class__.__name__)))
return self.close(folder=folder, as_coro=as_coro)
[docs] def close(self, folder=None, as_coro=False):
"""Close the environment.
Does the following:
1. calls :py:meth:`~creamas.core.Environment.save_info`
2. for each agent: calls :py:meth:`close`
3. Shuts down its RPC-service.
"""
async def _close(folder):
ret = self.save_info(folder)
for a in self.get_agents(addr=False):
a.close(folder=folder)
await self.shutdown(as_coro=True)
return ret
return run_or_coro(_close(folder), as_coro)
def __str__(self):
return self.__repr__()
def __repr__(self):
return "{}({})".format(self.__class__.__name__, self.name)