Multiprocessing¶
This module contains multiprocessing implementation for Environment
,
MultiEnvironment
.
A MultiEnvironment
holds several Environment
slaves, which are
spawned on their own processes, and uses managers to obtain much of the same functionality as the single processor
environment. See EnvManager
and MultiEnvManager
for details.
Warning
This functionality is currently largely untested. However, it seems to work as intended and may be used in
Simulation
.
-
class
creamas.mp.
EnvManager
(environment)[source]¶ A manager for
Environment
, which is a subclass ofaiomas.subproc.Manager
.Managers are used in environments which need to be able to execute commands originating from outside sources, e.g. in slave environments inside a multiprocessing environment.
A manager can spawn other agents into its environment, and can execute other tasks relevant to the environment. The manager should always be the first agent created to the environment.
Note
You should not need to create managers directly, instead pass the desired manager class to an instance of
MultiEnvironment
at its initialization time.-
create_connections
(connection_map)[source]¶ Create connections for agents in the environment.
This is a managing function for
create_connections()
.
-
get_agents
(addr=True, agent_cls=None, as_coro=False)[source]¶ Get agents from the managed environment.
This is a managing function for the
get_agents()
. The returned agent list excludes the environment’s manager agent (i.e. this agent) by design.
-
async
get_artifacts
()[source]¶ Get all artifacts from the host environment.
- Returns
All the artifacts in the environment.
-
get_connections
(data=False)[source]¶ Get connections from the agents in the environment.
This is a managing function for
get_connections()
.
-
async
is_ready
()[source]¶ Check if the managed environment is ready.
This is a managing function for
is_ready()
.
-
set_host_manager
(addr)[source]¶ Set host (or master) manager for this manager.
- Parameters
addr – Address for the host manager.
-
async
spawn_n
(agent_cls, n, *args, **kwargs)[source]¶ Spawn
n
agents to the managed environment.This is a convenience function so that one does not have to repeatedly make connections to the environment to spawn multiple agents with the same parameters.
See
spawn()
for details.
-
async
trigger_all
(*args, **kwargs)[source]¶ Trigger all agents in the managed environment to act once.
This is a managing function for
trigger_all()
.
-
-
class
creamas.mp.
MultiEnvManager
(environment)[source]¶ A manager for
MultiEnvironment
, which is a subclass ofaiomas.subproc.Manager
.A Manager can spawn other agents into its slave environments, and can execute other tasks relevant to the whole environment. The manager should always be the first (and usually only) agent created for the multi-environment’s managing environment. The actual simulation agents should be created to the slave environments, typically using multi-environment’s or its manager’s functionality.
Note
You should not need to create managers directly, instead pass the desired manager class to an instance of
MultiEnvironment
at its initialization time.-
async
create_connections
(connection_map)[source]¶ Create connections for agents in the multi-environment.
This is a managing function for
create_connections()
.
-
async
get_agents
(addr=True, agent_cls=None)[source]¶ Get addresses of all agents in all the slave environments.
This is a managing function for
get_agents()
.Note
Since
aiomas.rpc.Proxy
objects do not seem to handle (re)serialization,addr
andagent_cls
parameters are omitted from the call to underlying multi-environment’sget_agents()
.If
aiomas.rpc.Proxy
objects from all the agents are needed, call each slave environment manager’sget_agents()
directly.
-
async
get_connections
(data=True)[source]¶ Return connections for all the agents in the slave environments.
This is a managing function for
get_connections()
.
-
async
get_slave_managers
()[source]¶ Get addresses of the slave environment managers in this multi-environment.
-
handle
(msg)[source]¶ Handle message from a slave manager.
This is a dummy method which should be overridden in a subclass.
-
async
is_ready
()[source]¶ A managing function for
is_ready()
.
-
async
set_as_host_manager
(addr, timeout=5)[source]¶ Set the this manager as a host manager for the manager in addr.
This is a managing function for
set_host_manager()
.
-
async
spawn
(agent_cls, *args, addr=None, **kwargs)[source]¶ Spawn an agent to the environment.
This is a managing function for
spawn()
.Note
Since
aiomas.rpc.Proxy
objects do not seem to handle (re)serialization, only the address of the spawned agent is returned.
-
async
spawn_n
(agent_cls, n, *args, addr=None, **kwargs)[source]¶ Same as
spawn()
, but spawnn
agents with same initialization parameters.This is a managing function for
spawn_n()
.Note
Since
aiomas.rpc.Proxy
objects do not seem to handle (re)serialization, only the addresses of the spawned agents are returned.
-
async
trigger_all
(*args, **kwargs)[source]¶ Trigger all agents in the managed multi-environment to act.
This is a managing function for
trigger_all()
.
-
async
-
class
creamas.mp.
MultiEnvironment
(addr, env_cls, mgr_cls=None, name=None, logger=None, **env_kwargs)[source]¶ Environment for utilizing multiple processes (and cores) on a single machine.
MultiEnvironment
is not a subclass ofcreamas.core.environment.Environment
.MultiEnvironment
has a master environment, typically containing only a single manager, and a set of slave environments each having their own manager and (once spawned) the actual agents.Order of usage is:
import aiomas from creamas Environment from creamas.mp import EnvManager, MultiEnvironment from creamas.util import run # Create the multi-environment and the environment used to connect to # slave environments addr = ('localhost', 5555) env_cls = Environment env_kwargs = {'codec': aiomas.MsgPack} menv = MultiEnvironment(addr, env_cls, **env_kwargs) # Define slave environments and their arguments slave_addrs = [('localhost', 5556), ('localhost', 5557)] slave_env_cls = Environment slave_mgr_cls = EnvManager n_slaves = 2 slave_kwargs = [{'codec': aiomas.MsgPack} for _ in range(n_slaves)] # Spawn the actual slave environments menv.spawn_slaves(slave_addrs, slave_env_cls, slave_mgr_cls, slave_kwargs) # Wait that all the slaves are ready, if you need to do some other # preparation before environments' return True for is_ready, then # change check_ready=False run(menv.wait_slaves(10, check_ready=True)) # Do any additional preparations here, like spawning the agents to # slave environments. # Trigger all agents to act run(menv.trigger_all()) # Destroy the environment to free the resources menv.destroy(as_coro=False)
- Parameters
addr –
(HOST, PORT)
address for the master environment.env_cls – Class for the master environment, used to make connections to the slave environments. Must be a subclass of
Environment
.mgr_cls – Class for the multi-environment’s manager.
name (str) – Name of the environment. Will be shown in logs.
logger – Optional. Logger for the master environment.
-
add_artifact
(artifact)[source]¶ Add an artifact to the environment.
- Parameters
artifact (object) – Artifact to be added.
-
add_artifacts
(artifacts)[source]¶ Add artifacts to
artifacts
.- Parameters
artifacts – list of
Artifact
objects
-
check_ready
()[source]¶ Check if this multi-environment itself is ready.
Override in subclass if it needs any additional (asynchronous) initialization other than spawning its slave environments.
- Return type
- Returns
This basic implementation returns always True.
-
close
(folder=None, as_coro=False)[source]¶ Close the multiprocessing environment and its slave environments.
-
create_connections
(connection_map, as_coro=False)[source]¶ Create agent connections from the given connection map.
- Parameters
connection_map (dict) – A map of connections to be created. Dictionary where keys are agent addresses and values are lists of (addr, data)-tuples suitable for
add_connections()
.as_coro (bool) – If
True
returns a coroutine, otherwise runs the asynchronous calls to the slave environment managers in the event loop.
Only the connections for the agents that are in the slave environments are created.
-
destroy
(folder=None, as_coro=False)[source]¶ Close the multiprocessing environment and its slave environments.
Deprecated since version 0.4.0: Use
close()
instead
-
get_agents
(addr=True, agent_cls=None, as_coro=False)[source]¶ Get agents from the slave environments.
- Parameters
- Returns
A coroutine or list of
Proxy
objects or addresses as specified by the input parameters.
Slave environment managers are excluded from the returned list by default. Essentially, this method calls each slave environment manager’s
get_agents()
asynchronously.Tip
Calling each slave environment’s manager might be costly in some situations. Therefore, it is advisable to store the returned agent list if the agent sets in the slave environments are not bound to change.
-
get_artifacts
(agent_name=None)[source]¶ Get all artifacts or all artifacts published by a specific agent.
-
get_connections
(data=True, as_coro=False)[source]¶ Return connections from all the agents in the slave environments.
-
async
is_ready
()[source]¶ Check if the multi-environment has been fully initialized.
This calls each slave environment managers’
is_ready()
and checks if the multi-environment itself is ready by callingcheck_ready()
.See also
-
save_info
(folder, *args, **kwargs)[source]¶ Save information accumulated during the environment’s lifetime.
Called from
destroy()
. Override in subclass.- Parameters
folder (str) – root folder to save information
-
async
set_host_manager
(addr, timeout=5)[source]¶ Set this multi-environment’s manager as the host manager for a manager agent in addr
-
async
set_host_managers
(timeout=5)[source]¶ Set the master environment’s manager as host manager for the slave environment managers.
- Parameters
timeout (int) – Timeout for connecting to the slave managers.
This enables the slave environment managers to communicate back to the master environment. The master environment manager,
manager
, must be an instance ofMultiEnvManager
or its subclass if this method is called.
-
async
spawn
(agent_cls, *args, addr=None, **kwargs)[source]¶ Spawn a new agent in a slave environment.
- Parameters
- Returns
aiomas.rpc.Proxy
and address for the created agent.
The
*args
and**kwargs
are passed down to the agent’s__init__
.Tip
Use
spawn_n()
to spawn large number of agents with identical initialization parameters.
-
async
spawn_n
(agent_cls, n, *args, addr=None, **kwargs)[source]¶ Same as
spawn()
, but allows spawning multiple agents with the same initialization parameters simultaneously into one slave environment.- Parameters
agent_cls (str) –
qualname
of the agent class. That is, the name should be in the form ofpkg.mod:cls
, e.g.creamas.core.agent:CreativeAgent
.n (int) – Number of agents to spawn
addr (str) – Optional. Address for the slave enviroment’s manager. If
None
, spawns the agents in the slave environment with currently smallest number of agents.
- Returns
A list of (
aiomas.rpc.Proxy
, address)-tuples for the spawned agents.
The
*args
and**kwargs
are passed down to each agent’s__init__
.
-
async
spawn_slaves
(slave_addrs, slave_env_cls, slave_mgr_cls, slave_kwargs=None)[source]¶ Spawn slave environments.
- Parameters
slave_addrs – List of (HOST, PORT) addresses for the slave-environments.
slave_env_cls – Class for the slave environments.
slave_kwargs – If not None, must be a list of the same size as addrs. Each item in the list containing parameter values for one slave environment.
slave_mgr_cls – Class of the slave environment managers.
-
async
stop_slaves
(timeout=1)[source]¶ Stop all the slaves by sending a stop-message to their managers.
- Parameters
timeout (int) – Timeout for connecting to each manager. If a connection can not be made before the timeout expires, the resulting error for that particular manager is logged, but the stopping of other managers is not halted.
-
async
trigger_act
(addr)[source]¶ Trigger agent in
addr
to act.This method is quite inefficient if used repeatedly for a large number of agents.
-
async
trigger_all
(*args, **kwargs)[source]¶ Trigger all agents in all the slave environments to
act()
asynchronously.Given arguments and keyword arguments are passed down to each agent’s
act()
.Note
By design, the manager agents in each slave environment, i.e.
manager
, are excluded from acting.
-
async
wait_slaves
(timeout, check_ready=False)[source]¶ Wait until all slaves are online (their managers accept connections) or timeout expires.
- Parameters
timeout (int) – Timeout (in seconds) after which the method will return even though all the slaves are not online yet.
check_ready (bool) – If
True
also checks if all slave environment’s are ready. A slave environment is assumed to be ready when its manager’sis_ready()
-method returnsTrue
.
-
property
addrs
¶ Addresses of the slave environment managers.
-
property
artifacts
¶ Published artifacts for all agents.
-
property
env
¶ The environment hosting the manager of this multi-environment.
This environment is also used without the manager to connect to the slave environment managers and communicate with them.
-
property
logger
¶ Logger for the environment.
Logger should have at least
log()
method which takes two arguments: a log level and a message.
-
property
manager
¶ This multi-environment’s master manager.
-
property
name
¶ The name of the environment.
-
creamas.mp.
set_base_timeout
(timeout)[source]¶ Set base timeout (in seconds) for the rpc calls originating from the instances in this module.
-
creamas.mp.
spawn_container
(addr, env_cls=<class 'creamas.core.environment.Environment'>, mgr_cls=<class 'creamas.mp.EnvManager'>, set_seed=True, *args, **kwargs)[source]¶ Spawn a new environment in a given address as a coroutine.
Arguments and keyword arguments are passed down to the created environment at initialization time.
If setproctitle is installed, this function renames the title of the process to start with ‘creamas’ so that the process is easily identifiable, e.g. with
ps -x | grep creamas
.
-
creamas.mp.
spawn_containers
(addrs, env_cls=<class 'creamas.core.environment.Environment'>, env_params=None, mgr_cls=<class 'creamas.mp.EnvManager'>, *args, **kwargs)[source]¶ Spawn environments in a multiprocessing
multiprocessing.Pool
.Arguments and keyword arguments are passed down to the created environments at initialization time if env_params is None. If env_params is not None, then it is assumed to contain individual initialization parameters for each environment in addrs.
- Parameters
addrs – List of (HOST, PORT) addresses for the environments.
env_cls – Callable for the environments. Must be a subclass of
Environment
.env_params (Iterable of same length as addrs or None.) – Initialization parameters for the environments.
mgr_cls – Callable for the managers. Must be a subclass of
EnvManager
.s
- Returns
The created process pool and the ApplyAsync results for the spawned environments.
-
async
creamas.mp.
start
(addr, env_cls, mgr_cls, *env_args, **env_kwargs)[source]¶ Coroutine that starts an environment with
mgr_cls
manager agent.The agent will connect to addr
('host', port)
and wait for commands to spawn new agents within its environment.The env_args and env_kwargs will be passed to
env_cls.create()
factory function.This coroutine finishes after manager’s
stop
was called or when aKeyboardInterrupt
is raised and callsenv_cls.close()
before it finishes.- Parameters
addr – (HOST, PORT) for the new environment
env_cls – Class of the environment, subclass of
Environment
.mgr_cls – Class of the manager agent, subclass of
EnvManager
.