Multiprocessing

This module contains multiprocessing implementation for Environment, MultiEnvironment.

A MultiEnvironment holds several Environment slaves, which are spawned on their own processes, and uses managers to obtain much of the same functionality as the single processor environment. See EnvManager and MultiEnvManager for details.

Warning

This functionality is currently largely untested. However, it seems to work as intended and may be used in Simulation.

class creamas.mp.EnvManager(environment)[source]

A manager for Environment, which is a subclass of aiomas.subproc.Manager.

Managers are used in environments which need to be able to execute commands originating from outside sources, e.g. in slave environments inside a multiprocessing environment.

A manager can spawn other agents into its environment, and can execute other tasks relevant to the environment. The manager should always be the first agent created to the environment.

Note

You should not need to create managers directly, instead pass the desired manager class to an instance of MultiEnvironment at its initialization time.

artifacts()[source]

Return artifacts from the managed environment.

close(folder=None)[source]

Implemented for consistency. This basic implementation does nothing.

create_connections(connection_map)[source]

Create connections for agents in the environment.

This is a managing function for create_connections().

get_agents(addr=True, agent_cls=None, as_coro=False)[source]

Get agents from the managed environment.

This is a managing function for the get_agents(). The returned agent list excludes the environment’s manager agent (i.e. this agent) by design.

async get_artifacts()[source]

Get all artifacts from the host environment.

Returns

All the artifacts in the environment.

get_connections(data=False)[source]

Get connections from the agents in the environment.

This is a managing function for get_connections().

handle(msg)[source]

Handle message, override in subclass if needed.

host_manager()[source]

Get address of the host manager.

async is_ready()[source]

Check if the managed environment is ready.

This is a managing function for is_ready().

async report(msg, timeout=5)[source]

Report message to the host manager.

set_host_manager(addr)[source]

Set host (or master) manager for this manager.

Parameters

addr – Address for the host manager.

async spawn_n(agent_cls, n, *args, **kwargs)[source]

Spawn n agents to the managed environment.

This is a convenience function so that one does not have to repeatedly make connections to the environment to spawn multiple agents with the same parameters.

See spawn() for details.

async trigger_all(*args, **kwargs)[source]

Trigger all agents in the managed environment to act once.

This is a managing function for trigger_all().

class creamas.mp.MultiEnvManager(environment)[source]

A manager for MultiEnvironment, which is a subclass of aiomas.subproc.Manager.

A Manager can spawn other agents into its slave environments, and can execute other tasks relevant to the whole environment. The manager should always be the first (and usually only) agent created for the multi-environment’s managing environment. The actual simulation agents should be created to the slave environments, typically using multi-environment’s or its manager’s functionality.

Note

You should not need to create managers directly, instead pass the desired manager class to an instance of MultiEnvironment at its initialization time.

close(folder=None)[source]

Implemented for consistency. This basic implementation does nothing.

async create_connections(connection_map)[source]

Create connections for agents in the multi-environment.

This is a managing function for create_connections().

async get_agents(addr=True, agent_cls=None)[source]

Get addresses of all agents in all the slave environments.

This is a managing function for get_agents().

Note

Since aiomas.rpc.Proxy objects do not seem to handle (re)serialization, addr and agent_cls parameters are omitted from the call to underlying multi-environment’s get_agents().

If aiomas.rpc.Proxy objects from all the agents are needed, call each slave environment manager’s get_agents() directly.

async get_artifacts()[source]

Get all the artifacts from the multi-environment.

async get_connections(data=True)[source]

Return connections for all the agents in the slave environments.

This is a managing function for get_connections().

async get_slave_managers()[source]

Get addresses of the slave environment managers in this multi-environment.

handle(msg)[source]

Handle message from a slave manager.

This is a dummy method which should be overridden in a subclass.

async is_ready()[source]

A managing function for is_ready().

async set_as_host_manager(addr, timeout=5)[source]

Set the this manager as a host manager for the manager in addr.

This is a managing function for set_host_manager().

async spawn(agent_cls, *args, addr=None, **kwargs)[source]

Spawn an agent to the environment.

This is a managing function for spawn().

Note

Since aiomas.rpc.Proxy objects do not seem to handle (re)serialization, only the address of the spawned agent is returned.

async spawn_n(agent_cls, n, *args, addr=None, **kwargs)[source]

Same as spawn(), but spawn n agents with same initialization parameters.

This is a managing function for spawn_n().

Note

Since aiomas.rpc.Proxy objects do not seem to handle (re)serialization, only the addresses of the spawned agents are returned.

async trigger_all(*args, **kwargs)[source]

Trigger all agents in the managed multi-environment to act.

This is a managing function for trigger_all().

class creamas.mp.MultiEnvironment(addr, env_cls, mgr_cls=None, name=None, logger=None, **env_kwargs)[source]

Environment for utilizing multiple processes (and cores) on a single machine. MultiEnvironment is not a subclass of creamas.core.environment.Environment.

MultiEnvironment has a master environment, typically containing only a single manager, and a set of slave environments each having their own manager and (once spawned) the actual agents.

Order of usage is:

import aiomas

from creamas Environment
from creamas.mp import EnvManager, MultiEnvironment
from creamas.util import run

# Create the multi-environment and the environment used to connect to
# slave environments
addr = ('localhost', 5555)
env_cls = Environment
env_kwargs = {'codec': aiomas.MsgPack}
menv = MultiEnvironment(addr, env_cls, **env_kwargs)

# Define slave environments and their arguments
slave_addrs = [('localhost', 5556), ('localhost', 5557)]
slave_env_cls = Environment
slave_mgr_cls = EnvManager
n_slaves = 2
slave_kwargs = [{'codec': aiomas.MsgPack} for _ in range(n_slaves)]

# Spawn the actual slave environments
menv.spawn_slaves(slave_addrs, slave_env_cls,
                  slave_mgr_cls, slave_kwargs)

# Wait that all the slaves are ready, if you need to do some other
# preparation before environments' return True for is_ready, then
# change check_ready=False
run(menv.wait_slaves(10, check_ready=True))

# Do any additional preparations here, like spawning the agents to
# slave environments.

# Trigger all agents to act
run(menv.trigger_all())

# Destroy the environment to free the resources
menv.destroy(as_coro=False)
Parameters
  • addr(HOST, PORT) address for the master environment.

  • env_cls – Class for the master environment, used to make connections to the slave environments. Must be a subclass of Environment.

  • mgr_cls – Class for the multi-environment’s manager.

  • name (str) – Name of the environment. Will be shown in logs.

  • logger – Optional. Logger for the master environment.

add_artifact(artifact)[source]

Add an artifact to the environment.

Parameters

artifact (object) – Artifact to be added.

add_artifacts(artifacts)[source]

Add artifacts to artifacts.

Parameters

artifacts – list of Artifact objects

check_ready()[source]

Check if this multi-environment itself is ready.

Override in subclass if it needs any additional (asynchronous) initialization other than spawning its slave environments.

Return type

bool

Returns

This basic implementation returns always True.

close(folder=None, as_coro=False)[source]

Close the multiprocessing environment and its slave environments.

async connect(*args, **kwargs)[source]

A shortcut to environment’s connect().

create_connections(connection_map, as_coro=False)[source]

Create agent connections from the given connection map.

Parameters
  • connection_map (dict) – A map of connections to be created. Dictionary where keys are agent addresses and values are lists of (addr, data)-tuples suitable for add_connections().

  • as_coro (bool) – If True returns a coroutine, otherwise runs the asynchronous calls to the slave environment managers in the event loop.

Only the connections for the agents that are in the slave environments are created.

destroy(folder=None, as_coro=False)[source]

Close the multiprocessing environment and its slave environments.

Deprecated since version 0.4.0: Use close() instead

get_agents(addr=True, agent_cls=None, as_coro=False)[source]

Get agents from the slave environments.

Parameters
  • addr (bool) – If True, returns only addresses of the agents, otherwise returns a Proxy object for each agent.

  • agent_cls – If specified, returns only agents that are members of that particular class.

  • as_coro (bool) – If True, returns a coroutine, otherwise runs the method in an event loop.

Returns

A coroutine or list of Proxy objects or addresses as specified by the input parameters.

Slave environment managers are excluded from the returned list by default. Essentially, this method calls each slave environment manager’s get_agents() asynchronously.

Tip

Calling each slave environment’s manager might be costly in some situations. Therefore, it is advisable to store the returned agent list if the agent sets in the slave environments are not bound to change.

get_artifacts(agent_name=None)[source]

Get all artifacts or all artifacts published by a specific agent.

Parameters

agent_name (str) – Optional. Name of the agent which artifacts are returned.

Returns

All artifacts or all artifacts published by the agent.

Return type

list

get_connections(data=True, as_coro=False)[source]

Return connections from all the agents in the slave environments.

Parameters
  • data (bool) – If True, returns also the data stored for each connection.

  • as_coro (bool) – If True returns a coroutine, otherwise runs the asynchronous calls to the slave environment managers in the event loop.

get_slave_managers()[source]

Get addresses of all slave environment managers.

async is_ready()[source]

Check if the multi-environment has been fully initialized.

This calls each slave environment managers’ is_ready() and checks if the multi-environment itself is ready by calling check_ready().

See also

is_ready()

save_info(folder, *args, **kwargs)[source]

Save information accumulated during the environment’s lifetime.

Called from destroy(). Override in subclass.

Parameters

folder (str) – root folder to save information

async set_host_manager(addr, timeout=5)[source]

Set this multi-environment’s manager as the host manager for a manager agent in addr

async set_host_managers(timeout=5)[source]

Set the master environment’s manager as host manager for the slave environment managers.

Parameters

timeout (int) – Timeout for connecting to the slave managers.

This enables the slave environment managers to communicate back to the master environment. The master environment manager, manager, must be an instance of MultiEnvManager or its subclass if this method is called.

async spawn(agent_cls, *args, addr=None, **kwargs)[source]

Spawn a new agent in a slave environment.

Parameters
  • agent_cls (str) – qualname of the agent class. That is, the name should be in the form pkg.mod:cls, e.g. creamas.core.agent:CreativeAgent.

  • addr (str) – Optional. Address for the slave enviroment’s manager. If None, spawns the agent in the slave environment with currently smallest number of agents.

Returns

aiomas.rpc.Proxy and address for the created agent.

The *args and **kwargs are passed down to the agent’s __init__.

Tip

Use spawn_n() to spawn large number of agents with identical initialization parameters.

async spawn_n(agent_cls, n, *args, addr=None, **kwargs)[source]

Same as spawn(), but allows spawning multiple agents with the same initialization parameters simultaneously into one slave environment.

Parameters
  • agent_cls (str) – qualname of the agent class. That is, the name should be in the form of pkg.mod:cls, e.g. creamas.core.agent:CreativeAgent.

  • n (int) – Number of agents to spawn

  • addr (str) – Optional. Address for the slave enviroment’s manager. If None, spawns the agents in the slave environment with currently smallest number of agents.

Returns

A list of (aiomas.rpc.Proxy, address)-tuples for the spawned agents.

The *args and **kwargs are passed down to each agent’s __init__.

async spawn_slaves(slave_addrs, slave_env_cls, slave_mgr_cls, slave_kwargs=None)[source]

Spawn slave environments.

Parameters
  • slave_addrs – List of (HOST, PORT) addresses for the slave-environments.

  • slave_env_cls – Class for the slave environments.

  • slave_kwargs – If not None, must be a list of the same size as addrs. Each item in the list containing parameter values for one slave environment.

  • slave_mgr_cls – Class of the slave environment managers.

async stop_slaves(timeout=1)[source]

Stop all the slaves by sending a stop-message to their managers.

Parameters

timeout (int) – Timeout for connecting to each manager. If a connection can not be made before the timeout expires, the resulting error for that particular manager is logged, but the stopping of other managers is not halted.

async trigger_act(addr)[source]

Trigger agent in addr to act.

This method is quite inefficient if used repeatedly for a large number of agents.

async trigger_all(*args, **kwargs)[source]

Trigger all agents in all the slave environments to act() asynchronously.

Given arguments and keyword arguments are passed down to each agent’s act().

Note

By design, the manager agents in each slave environment, i.e. manager, are excluded from acting.

async wait_slaves(timeout, check_ready=False)[source]

Wait until all slaves are online (their managers accept connections) or timeout expires.

Parameters
  • timeout (int) – Timeout (in seconds) after which the method will return even though all the slaves are not online yet.

  • check_ready (bool) – If True also checks if all slave environment’s are ready. A slave environment is assumed to be ready when its manager’s is_ready()-method returns True.

property addrs

Addresses of the slave environment managers.

property artifacts

Published artifacts for all agents.

property env

The environment hosting the manager of this multi-environment.

This environment is also used without the manager to connect to the slave environment managers and communicate with them.

property logger

Logger for the environment.

Logger should have at least log() method which takes two arguments: a log level and a message.

property manager

This multi-environment’s master manager.

property name

The name of the environment.

creamas.mp.set_base_timeout(timeout)[source]

Set base timeout (in seconds) for the rpc calls originating from the instances in this module.

creamas.mp.spawn_container(addr, env_cls=<class 'creamas.core.environment.Environment'>, mgr_cls=<class 'creamas.mp.EnvManager'>, set_seed=True, *args, **kwargs)[source]

Spawn a new environment in a given address as a coroutine.

Arguments and keyword arguments are passed down to the created environment at initialization time.

If setproctitle is installed, this function renames the title of the process to start with ‘creamas’ so that the process is easily identifiable, e.g. with ps -x | grep creamas.

creamas.mp.spawn_containers(addrs, env_cls=<class 'creamas.core.environment.Environment'>, env_params=None, mgr_cls=<class 'creamas.mp.EnvManager'>, *args, **kwargs)[source]

Spawn environments in a multiprocessing multiprocessing.Pool.

Arguments and keyword arguments are passed down to the created environments at initialization time if env_params is None. If env_params is not None, then it is assumed to contain individual initialization parameters for each environment in addrs.

Parameters
  • addrs – List of (HOST, PORT) addresses for the environments.

  • env_cls – Callable for the environments. Must be a subclass of Environment.

  • env_params (Iterable of same length as addrs or None.) – Initialization parameters for the environments.

  • mgr_cls – Callable for the managers. Must be a subclass of EnvManager.s

Returns

The created process pool and the ApplyAsync results for the spawned environments.

async creamas.mp.start(addr, env_cls, mgr_cls, *env_args, **env_kwargs)[source]

Coroutine that starts an environment with mgr_cls manager agent.

The agent will connect to addr ('host', port) and wait for commands to spawn new agents within its environment.

The env_args and env_kwargs will be passed to env_cls.create() factory function.

This coroutine finishes after manager’s stop was called or when a KeyboardInterrupt is raised and calls env_cls.close() before it finishes.

Parameters
  • addr – (HOST, PORT) for the new environment

  • env_cls – Class of the environment, subclass of Environment.

  • mgr_cls – Class of the manager agent, subclass of EnvManager.