Source code for creamas.mp

"""
.. py:module:: mp
    :platform: Unix

This module contains multiprocessing implementation for :class:`~creamas.core.environment.Environment`,
:class:`~creamas.mp.MultiEnvironment`.

A :class:`~creamas.mp.MultiEnvironment` holds several :class:`~creamas.core.environment.Environment` slaves, which are
spawned on their own processes, and uses managers to obtain much of the same functionality as the single processor
environment. See :class:`~creamas.mp.EnvManager` and :class:`~creamas.mp.MultiEnvManager` for details.

.. warning::
    This functionality is currently largely untested. However, it *seems* to work as intended and may be used in
    :class:`~creamas.core.simulation.Simulation`.
"""
import asyncio
import logging
import multiprocessing
import time

from aiomas.subproc import Manager
from aiomas.agent import _get_base_url

from creamas.core.environment import Environment
from creamas.util import run_or_coro, create_tasks, expose


logger = logging.getLogger(__name__)
TIMEOUT = 5


[docs]def set_base_timeout(timeout): """Set base timeout (in seconds) for the rpc calls originating from the instances in this module. """ global TIMEOUT TIMEOUT = timeout
[docs]class EnvManager(Manager): """A manager for :class:`~creamas.core.environment.Environment`, which is a subclass of :class:`aiomas.subproc.Manager`. Managers are used in environments which need to be able to execute commands originating from outside sources, e.g. in slave environments inside a multiprocessing environment. A manager can spawn other agents into its environment, and can execute other tasks relevant to the environment. The manager should always be the first agent created to the environment. .. note:: You should not need to create managers directly, instead pass the desired manager class to an instance of :class:`~creamas.mp.MultiEnvironment` at its initialization time. """ def __init__(self, environment): super().__init__(environment) self._host_manager = None @property def env(self): return self.container
[docs] @expose def set_host_manager(self, addr): """Set host (or master) manager for this manager. :param addr: Address for the host manager. """ self._host_manager = addr
[docs] @expose def host_manager(self): """Get address of the host manager. """ return self._host_manager
[docs] @expose async def report(self, msg, timeout=TIMEOUT): """Report message to the host manager. """ try: host_manager = await self.env.connect(self.host_manager, timeout=timeout) except: raise ConnectionError("Could not reach host manager ({}).".format(self.host_manager)) ret = await host_manager.handle(msg) return ret
[docs] @expose def handle(self, msg): """Handle message, override in subclass if needed. """ pass
[docs] @expose def get_agents(self, addr=True, agent_cls=None, as_coro=False): """Get agents from the managed environment. This is a managing function for the :py:meth:`~creamas.environment.Environment.get_agents`. The returned agent list excludes the environment's manager agent (i.e. this agent) by design. """ return self.env.get_agents(addr=addr, agent_cls=agent_cls)
@expose def set_log_folder(self, log_folder): self.env.log_folder = log_folder
[docs] @expose def artifacts(self): """Return artifacts from the managed environment. """ return self.env.artifacts
[docs] @expose def create_connections(self, connection_map): """Create connections for agents in the environment. This is a managing function for :meth:`~creamas.core.environment.Environment.create_connections`. """ return self.env.create_connections(connection_map)
[docs] @expose def get_connections(self, data=False): """Get connections from the agents in the environment. This is a managing function for :meth:`~creamas.core.environment.Environment.get_connections`. """ return self.env.get_connections(data=data)
[docs] @expose async def get_artifacts(self): """Get all artifacts from the host environment. :returns: All the artifacts in the environment. """ host_manager = await self.env.connect(self._host_manager, timeout=TIMEOUT) artifacts = await host_manager.get_artifacts() return artifacts
[docs] @expose def close(self, folder=None): """Implemented for consistency. This basic implementation does nothing. """ pass
[docs] @expose async def trigger_all(self, *args, **kwargs): """Trigger all agents in the managed environment to act once. This is a managing function for :meth:`~creamas.core.environment.Environment.trigger_all`. """ return await self.env.trigger_all(*args, **kwargs)
[docs] @expose async def is_ready(self): """Check if the managed environment is ready. This is a managing function for :py:meth:`~creamas.environment.Environment.is_ready`. """ return self.env.is_ready()
[docs] @expose async def spawn_n(self, agent_cls, n, *args, **kwargs): """Spawn :attr:`n` agents to the managed environment. This is a convenience function so that one does not have to repeatedly make connections to the environment to spawn multiple agents with the same parameters. See :meth:`~aiomas.subproc.Manager.spawn` for details. """ rets = [] for _ in range(n): ret = await self.spawn(agent_cls, *args, **kwargs) rets.append(ret) return rets
[docs]class MultiEnvManager(Manager): """A manager for :class:`~creamas.mp.MultiEnvironment`, which is a subclass of :class:`aiomas.subproc.Manager`. A Manager can spawn other agents into its slave environments, and can execute other tasks relevant to the whole environment. The manager should always be the first (and usually only) agent created for the multi-environment's managing environment. The actual simulation agents should be created to the slave environments, typically using multi-environment's or its manager's functionality. .. note:: You should not need to create managers directly, instead pass the desired manager class to an instance of :class:`~creamas.mp.MultiEnvironment` at its initialization time. """ def __init__(self, environment): super().__init__(environment) @property def env(self): return self.container
[docs] @expose def handle(self, msg): """Handle message from a slave manager. **This is a dummy method which should be overridden in a subclass.** """ pass
[docs] @expose async def spawn(self, agent_cls, *args, addr=None, **kwargs): """Spawn an agent to the environment. This is a managing function for :meth:`~creamas.mp.MultiEnvironment.spawn`. .. note:: Since :class:`aiomas.rpc.Proxy` objects do not seem to handle (re)serialization, only the address of the spawned agent is returned. """ _, addr = await self.menv.spawn(agent_cls, *args, addr=addr, **kwargs) return addr
[docs] @expose async def spawn_n(self, agent_cls, n, *args, addr=None, **kwargs): """Same as :meth:`~creamas.mp.MultiEnvManager.spawn`, but spawn :attr:`n` agents with same initialization parameters. This is a managing function for :meth:`~creamas.mp.MultiEnvironment.spawn_n`. .. note:: Since :class:`aiomas.rpc.Proxy` objects do not seem to handle (re)serialization, only the addresses of the spawned agents are returned. """ ret = await self.menv.spawn_n(agent_cls, n, *args, addr=addr, **kwargs) return [r[1] for r in ret]
[docs] @expose async def get_agents(self, addr=True, agent_cls=None): """Get addresses of all agents in all the slave environments. This is a managing function for :meth:`~creamas.mp.MultiEnvironment.get_agents`. .. note:: Since :class:`aiomas.rpc.Proxy` objects do not seem to handle (re)serialization, ``addr`` and ``agent_cls`` parameters are omitted from the call to underlying multi-environment's :meth:`get_agents`. If :class:`aiomas.rpc.Proxy` objects from all the agents are needed, call each slave environment manager's :meth:`get_agents` directly. """ return await self.menv.get_agents(addr=True, agent_cls=None, as_coro=True)
[docs] @expose async def create_connections(self, connection_map): """Create connections for agents in the multi-environment. This is a managing function for :meth:`~creamas.mp.MultiEnvironment.create_connections`. """ return await self.menv.create_connections(connection_map, as_coro=True)
[docs] @expose async def get_connections(self, data=True): """Return connections for all the agents in the slave environments. This is a managing function for :meth:`~creamas.mp.MultiEnvironment.get_connections`. """ return await self.menv.get_connections(data=data, as_coro=True)
[docs] @expose def close(self, folder=None): """Implemented for consistency. This basic implementation does nothing. """ pass
[docs] @expose async def set_as_host_manager(self, addr, timeout=5): """Set the this manager as a host manager for the manager in *addr*. This is a managing function for :py:meth:`~creamas.mp.MultiEnvironment.set_host_manager`. """ return await self.menv.set_host_manager(addr, timeout=timeout)
[docs] @expose async def trigger_all(self, *args, **kwargs): """Trigger all agents in the managed multi-environment to act. This is a managing function for :py:meth:`~creamas.mp.MultiEnvironment.trigger_all`. """ return await self.menv.trigger_all(*args, **kwargs)
[docs] @expose async def is_ready(self): """A managing function for :py:meth:`~creamas.mp.MultiEnvironment.is_ready`. """ return await self.menv.is_ready()
[docs] @expose async def get_artifacts(self): """Get all the artifacts from the multi-environment. """ return self.menv.artifacts
[docs] @expose async def get_slave_managers(self): """Get addresses of the slave environment managers in this multi-environment. """ return self.menv.get_slave_managers()
[docs]class MultiEnvironment(): """Environment for utilizing multiple processes (and cores) on a single machine. :class:`MultiEnvironment` is not a subclass of :class:`creamas.core.environment.Environment`. :py:class:`MultiEnvironment` has a master environment, typically containing only a single manager, and a set of slave environments each having their own manager and (once spawned) the actual agents. Order of usage is:: import aiomas from creamas Environment from creamas.mp import EnvManager, MultiEnvironment from creamas.util import run # Create the multi-environment and the environment used to connect to # slave environments addr = ('localhost', 5555) env_cls = Environment env_kwargs = {'codec': aiomas.MsgPack} menv = MultiEnvironment(addr, env_cls, **env_kwargs) # Define slave environments and their arguments slave_addrs = [('localhost', 5556), ('localhost', 5557)] slave_env_cls = Environment slave_mgr_cls = EnvManager n_slaves = 2 slave_kwargs = [{'codec': aiomas.MsgPack} for _ in range(n_slaves)] # Spawn the actual slave environments menv.spawn_slaves(slave_addrs, slave_env_cls, slave_mgr_cls, slave_kwargs) # Wait that all the slaves are ready, if you need to do some other # preparation before environments' return True for is_ready, then # change check_ready=False run(menv.wait_slaves(10, check_ready=True)) # Do any additional preparations here, like spawning the agents to # slave environments. # Trigger all agents to act run(menv.trigger_all()) # Destroy the environment to free the resources menv.destroy(as_coro=False) """ def __init__(self, addr, env_cls, mgr_cls=None, name=None, logger=None, **env_kwargs): """ :param addr: ``(HOST, PORT)`` address for the master environment. :param env_cls: Class for the master environment, used to make connections to the slave environments. Must be a subclass of :py:class:`~creamas.core.environment.Environment`. :param mgr_cls: Class for the multi-environment's manager. :param str name: Name of the environment. Will be shown in logs. :param logger: Optional. Logger for the master environment. """ self._addr = addr self._env = env_cls.create(addr, **env_kwargs) if mgr_cls is not None: self._manager = mgr_cls(self._env) self._manager.menv = self else: self._manager = None self._age = 0 self._artifacts = [] self._candidates = [] self._manager_addrs = [] if type(name) is str: self._name = name else: self._name = "{}:{}".format(addr[0], addr[1]) self._logger = logger self._pool = None self._r = None def __str__(self): return self.__repr__() def __repr__(self): return "{}({})".format(self.__class__.__name__, self.name) @property def name(self): """The name of the environment. """ return self._name @property def env(self): """The environment hosting the manager of this multi-environment. This environment is also used without the manager to connect to the slave environment managers and communicate with them. """ return self._env
[docs] def get_agents(self, addr=True, agent_cls=None, as_coro=False): """Get agents from the slave environments. :param bool addr: If ``True``, returns only addresses of the agents, otherwise returns a :class:`Proxy` object for each agent. :param agent_cls: If specified, returns only agents that are members of that particular class. :param bool as_coro: If ``True``, returns a coroutine, otherwise runs the method in an event loop. :returns: A coroutine or list of :class:`Proxy` objects or addresses as specified by the input parameters. Slave environment managers are excluded from the returned list by default. Essentially, this method calls each slave environment manager's :meth:`~creamas.mp.EnvManager.get_agents` asynchronously. .. tip:: Calling each slave environment's manager might be costly in some situations. Therefore, it is advisable to store the returned agent list if the agent sets in the slave environments are not bound to change. """ async def slave_task(mgr_addr, addr=True, agent_cls=None): r_manager = await self.env.connect(mgr_addr, timeout=TIMEOUT) return await r_manager.get_agents(addr=addr, agent_cls=agent_cls) tasks = create_tasks(slave_task, self.addrs, addr, agent_cls) return run_or_coro(tasks, as_coro)
@property def addrs(self): """Addresses of the slave environment managers. """ return self._manager_addrs @property def manager(self): """This multi-environment's master manager. """ return self._manager @property def logger(self): """Logger for the environment. Logger should have at least :meth:`log` method which takes two arguments: a log level and a message. """ return self._logger @property def artifacts(self): """Published artifacts for all agents. """ return self._artifacts
[docs] async def connect(self, *args, **kwargs): """A shortcut to environment's :meth:`connect`. """ return await self.env.connect(*args, **kwargs)
[docs] def check_ready(self): """Check if this multi-environment itself is ready. Override in subclass if it needs any additional (asynchronous) initialization other than spawning its slave environments. :rtype: bool :returns: This basic implementation returns always True. """ return True
[docs] async def is_ready(self): """Check if the multi-environment has been fully initialized. This calls each slave environment managers' :py:meth:`is_ready` and checks if the multi-environment itself is ready by calling :py:meth:`~creamas.mp.MultiEnvironment.check_ready`. .. seealso:: :py:meth:`~creamas.core.environment.Environment.is_ready` """ async def slave_task(addr, timeout): try: r_manager = await self.env.connect(addr, timeout=timeout) ready = await r_manager.is_ready() if not ready: return False except: return False return True if not self.env.is_ready(): return False if not self.check_ready(): return False rets = await create_tasks(slave_task, self.addrs, 0.5) if not all(rets): return False return True
[docs] async def spawn_slaves(self, slave_addrs, slave_env_cls, slave_mgr_cls, slave_kwargs=None): """Spawn slave environments. :param slave_addrs: List of (HOST, PORT) addresses for the slave-environments. :param slave_env_cls: Class for the slave environments. :param slave_kwargs: If not None, must be a list of the same size as *addrs*. Each item in the list containing parameter values for one slave environment. :param slave_mgr_cls: Class of the slave environment managers. """ pool, r = spawn_containers(slave_addrs, env_cls=slave_env_cls, env_params=slave_kwargs, mgr_cls=slave_mgr_cls) self._pool = pool self._r = r self._manager_addrs = ["{}{}".format(_get_base_url(a), 0) for a in slave_addrs]
[docs] async def wait_slaves(self, timeout, check_ready=False): """Wait until all slaves are online (their managers accept connections) or timeout expires. :param int timeout: Timeout (in seconds) after which the method will return even though all the slaves are not online yet. :param bool check_ready: If ``True`` also checks if all slave environment's are ready. A slave environment is assumed to be ready when its manager's :meth:`is_ready`-method returns ``True``. .. seealso:: :meth:`creamas.core.environment.Environment.is_ready`, :meth:`creamas.mp.EnvManager.is_ready`, :meth:`creamas.mp.MultiEnvManager.is_ready` """ status = 'ready' if check_ready else 'online' self._log(logging.DEBUG, "Waiting for slaves to become {}...".format(status)) t = time.monotonic() online = [] while len(online) < len(self.addrs): for addr in self.addrs: if time.monotonic() - t > timeout: self._log(logging.DEBUG, "Timeout while waiting for the " "slaves to become {}.".format(status)) return False if addr not in online: try: r_manager = await self.env.connect(addr, timeout) ready = True if check_ready: ready = await r_manager.is_ready() if ready: online.append(addr) self._log(logging.DEBUG, "Slave {}/{} {}: {}" .format(len(online), len(self.addrs), status, addr)) except: pass await asyncio.sleep(0.5) self._log(logging.DEBUG, "All slaves {} in {} seconds!" .format(status, time.monotonic() - t)) return True
def _get_log_folders(self, log_folder, addrs): if type(log_folder) is str: import os folders = [os.path.join(log_folder, '_{}'.format(i)) for i in range(len(addrs))] return folders else: folders = [None for _ in range(len(addrs))] return folders
[docs] async def set_host_manager(self, addr, timeout=TIMEOUT): """Set this multi-environment's manager as the host manager for a manager agent in *addr* """ r_manager = await self.env.connect(addr, timeout=timeout) return await r_manager.set_host_manager(self.manager.addr)
[docs] async def set_host_managers(self, timeout=5): """Set the master environment's manager as host manager for the slave environment managers. :param int timeout: Timeout for connecting to the slave managers. This enables the slave environment managers to communicate back to the master environment. The master environment manager, :attr:`~creamas.mp.MultiEnvironment.manager`, must be an instance of :class:`~creamas.mp.MultiEnvManager` or its subclass if this method is called. """ return await create_tasks(self.set_host_manager, self.addrs, timeout)
[docs] async def trigger_act(self, addr): """Trigger agent in :attr:`addr` to act. This method is quite inefficient if used repeatedly for a large number of agents. .. seealso:: :py:meth:`creamas.mp.MultiEnvironment.trigger_all` """ r_agent = await self.env.connect(addr, timeout=TIMEOUT) return await r_agent.act()
[docs] async def trigger_all(self, *args, **kwargs): """Trigger all agents in all the slave environments to :meth:`act` asynchronously. Given arguments and keyword arguments are passed down to each agent's :meth:`~creamas.core.agent.CreativeAgent.act`. .. note:: By design, the manager agents in each slave environment, i.e. :attr:`manager`, are excluded from acting. """ async def slave_task(addr, *args, **kwargs): r_manager = await self.env.connect(addr, timeout=TIMEOUT) return await r_manager.trigger_all(*args, **kwargs) return await create_tasks(slave_task, self.addrs, *args, **kwargs)
async def _get_smallest_env(self): """Get address of the slave environment manager with the smallest number of agents. """ async def slave_task(mgr_addr): r_manager = await self.env.connect(mgr_addr, timeout=TIMEOUT) ret = await r_manager.get_agents(addr=True) return mgr_addr, len(ret) sizes = await create_tasks(slave_task, self.addrs, flatten=False) return sorted(sizes, key=lambda x: x[1])[0][0]
[docs] async def spawn(self, agent_cls, *args, addr=None, **kwargs): """Spawn a new agent in a slave environment. :param str agent_cls: ``qualname`` of the agent class. That is, the name should be in the form ``pkg.mod:cls``, e.g. ``creamas.core.agent:CreativeAgent``. :param str addr: Optional. Address for the slave enviroment's manager. If ``None``, spawns the agent in the slave environment with currently smallest number of agents. :returns: :class:`aiomas.rpc.Proxy` and address for the created agent. The ``*args`` and ``**kwargs`` are passed down to the agent's ``__init__``. .. tip:: Use :meth:`~creamas.mp.MultiEnvironment.spawn_n` to spawn large number of agents with identical initialization parameters. """ if addr is None: addr = await self._get_smallest_env() r_manager = await self.env.connect(addr) return await r_manager.spawn(agent_cls, *args, **kwargs)
[docs] async def spawn_n(self, agent_cls, n, *args, addr=None, **kwargs): """Same as :meth:`~creamas.mp.MultiEnvironment.spawn`, but allows spawning multiple agents with the same initialization parameters simultaneously into **one** slave environment. :param str agent_cls: ``qualname`` of the agent class. That is, the name should be in the form of ``pkg.mod:cls``, e.g. ``creamas.core.agent:CreativeAgent``. :param int n: Number of agents to spawn :param str addr: Optional. Address for the slave enviroment's manager. If ``None``, spawns the agents in the slave environment with currently smallest number of agents. :returns: A list of (:class:`aiomas.rpc.Proxy`, address)-tuples for the spawned agents. The ``*args`` and ``**kwargs`` are passed down to each agent's ``__init__``. """ if addr is None: addr = await self._get_smallest_env() r_manager = await self.env.connect(addr) return await r_manager.spawn_n(agent_cls, n, *args, **kwargs)
[docs] def create_connections(self, connection_map, as_coro=False): """Create agent connections from the given connection map. :param dict connection_map: A map of connections to be created. Dictionary where keys are agent addresses and values are lists of (addr, data)-tuples suitable for :meth:`~creamas.core.agent.CreativeAgent.add_connections`. :param bool as_coro: If ``True`` returns a coroutine, otherwise runs the asynchronous calls to the slave environment managers in the event loop. Only the connections for the agents that are in the slave environments are created. """ async def slave_task(addr, connection_map): r_manager = await self.env.connect(addr) return await r_manager.create_connections(connection_map) tasks = create_tasks(slave_task, self.addrs, connection_map) return run_or_coro(tasks, as_coro)
[docs] def get_connections(self, data=True, as_coro=False): """Return connections from all the agents in the slave environments. :param bool data: If ``True``, returns also the data stored for each connection. :param bool as_coro: If ``True`` returns a coroutine, otherwise runs the asynchronous calls to the slave environment managers in the event loop. .. seealso:: :meth:`creamas.core.environment.Environment.get_connections` """ async def slave_task(addr, data): r_manager = await self.env.connect(addr) return await r_manager.get_connections(data) tasks = create_tasks(slave_task, self.addrs, data) return run_or_coro(tasks, as_coro)
[docs] def get_slave_managers(self): """Get addresses of all slave environment managers. """ return self.addrs
[docs] def add_artifact(self, artifact): """Add an artifact to the environment. :param object artifact: Artifact to be added. """ artifact.env_time = self.age self.artifacts.append(artifact) self._log(logging.DEBUG, "ARTIFACTS appended: '{}', length={}" .format(artifact, len(self.artifacts)))
[docs] def add_artifacts(self, artifacts): """Add artifacts to :attr:`artifacts`. :param artifacts: list of :py:class:`~creamas.core.artifact.Artifact` objects """ for artifact in artifacts: self.add_artifact(artifact)
[docs] def get_artifacts(self, agent_name=None): """Get all artifacts or all artifacts published by a specific agent. :param str agent_name: Optional. Name of the agent which artifacts are returned. :returns: All artifacts or all artifacts published by the agent. :rtype: list """ if agent_name is not None: return [a for a in self.artifacts if agent_name == a.creator] return self.artifacts
def _log(self, level, msg): if self.logger is not None: self.logger.log(level, msg)
[docs] def save_info(self, folder, *args, **kwargs): """Save information accumulated during the environment's lifetime. Called from :py:meth:`~creamas.mp.MultiEnvironment.destroy`. Override in subclass. :param str folder: root folder to save information """ pass
[docs] async def stop_slaves(self, timeout=1): """Stop all the slaves by sending a stop-message to their managers. :param int timeout: Timeout for connecting to each manager. If a connection can not be made before the timeout expires, the resulting error for that particular manager is logged, but the stopping of other managers is not halted. """ for addr in self.addrs: try: r_manager = await self.env.connect(addr, timeout=timeout) await r_manager.stop() except: self._log(logging.WARNING, "Could not stop {}".format(addr))
[docs] def destroy(self, folder=None, as_coro=False): """Close the multiprocessing environment and its slave environments. .. deprecated:: 0.4.0 Use :func:`close` instead """ DeprecationWarning("{0}.destroy is deprecated, use {0}.close instead.".format(str(self.__class__.__name__))) return self.close(folder=folder, as_coro=as_coro)
[docs] def close(self, folder=None, as_coro=False): """Close the multiprocessing environment and its slave environments. """ async def _close(folder): ret = self.save_info(folder) await self.stop_slaves() # Terminate and join the process pool when we are destroyed. # Do not wait for unfinished processed with pool.close(), # the slaves should be anyway already stopped. if self._pool is not None: self._pool.terminate() self._pool.join() await self._env.shutdown(as_coro=True) return ret return run_or_coro(_close(folder), as_coro)
[docs]def spawn_container(addr, env_cls=Environment, mgr_cls=EnvManager, set_seed=True, *args, **kwargs): """Spawn a new environment in a given address as a coroutine. Arguments and keyword arguments are passed down to the created environment at initialization time. If `setproctitle <https://pypi.python.org/pypi/setproctitle>`_ is installed, this function renames the title of the process to start with 'creamas' so that the process is easily identifiable, e.g. with ``ps -x | grep creamas``. """ # Try setting the process name to easily recognize the spawned # environments with 'ps -x' or 'top' try: import setproctitle as spt title = 'creamas: {}({})'.format(env_cls.__class__.__name__, _get_base_url(addr)) spt.setproctitle(title) except: pass if set_seed: _set_random_seeds() # kwargs['codec'] = aiomas.MsgPack task = start(addr, env_cls, mgr_cls, *args, **kwargs) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(task)
[docs]def spawn_containers(addrs, env_cls=Environment, env_params=None, mgr_cls=EnvManager, *args, **kwargs): """Spawn environments in a multiprocessing :class:`multiprocessing.Pool`. Arguments and keyword arguments are passed down to the created environments at initialization time if *env_params* is None. If *env_params* is not None, then it is assumed to contain individual initialization parameters for each environment in *addrs*. :param addrs: List of (HOST, PORT) addresses for the environments. :param env_cls: Callable for the environments. Must be a subclass of :py:class:`~creamas.core.environment.Environment`. :param env_params: Initialization parameters for the environments. :type env_params: Iterable of same length as *addrs* or None. :param mgr_cls: Callable for the managers. Must be a subclass of :py:class:`~creamas.mp.EnvManager`.s :returns: The created process pool and the *ApplyAsync* results for the spawned environments. """ pool = multiprocessing.Pool(len(addrs)) kwargs['env_cls'] = env_cls kwargs['mgr_cls'] = mgr_cls r = [] for i, addr in enumerate(addrs): if env_params is not None: k = env_params[i] k['env_cls'] = env_cls k['mgr_cls'] = mgr_cls # Copy kwargs so that we can apply different address to different # containers. else: k = kwargs.copy() k['addr'] = addr ret = pool.apply_async(spawn_container, args=args, kwds=k, error_callback=logger.warning) r.append(ret) return pool, r
[docs]async def start(addr, env_cls, mgr_cls, *env_args, **env_kwargs): """`Coroutine <https://docs.python.org/3/library/asyncio-task.html#coroutine>`_ that starts an environment with :class:`mgr_cls` manager agent. The agent will connect to *addr* ``('host', port)`` and wait for commands to spawn new agents within its environment. The *env_args* and *env_kwargs* will be passed to ``env_cls.create()`` factory function. This coroutine finishes after manager's ``stop`` was called or when a :exc:`KeyboardInterrupt` is raised and calls ``env_cls.close()`` before it finishes. :param addr: (HOST, PORT) for the new environment :param env_cls: Class of the environment, subclass of :class:`~creamas.core.environment.Environment`. :param mgr_cls: Class of the manager agent, subclass of :class:`~creamas.mp.EnvManager`. """ env_kwargs.update(as_coro=True) log_folder = env_kwargs.get('log_folder', None) env = await env_cls.create(addr, *env_args, **env_kwargs) try: manager = mgr_cls(env) env.manager = manager await manager.stop_received except KeyboardInterrupt: logger.info('Execution interrupted by user') finally: await env.close(folder=log_folder, as_coro=True)
def _set_random_seeds(): """Set new random seeds for the process. """ try: import numpy as np np.random.seed() except: pass try: import scipy as sp sp.random.seed() except: pass import random random.seed()