Distributed Systems¶
The module holds a base implementation, DistributedEnvironment
,
for simulations and environments where the resources span over multiple nodes
on computing clusters or other distributed systems.
Note
The module needs extras to be installed using pip install creamas[extras]
.
-
class
creamas.ds.
DistributedEnvironment
(addr, env_cls, nodes, mgr_cls=None, name=None, logger=None, **env_kwargs)[source]¶ Distributed environment which manages several nodes containing multi-environments, a subclass of
MultiEnvironment
.This environment spawns its slave multi-environments on the different servers (nodes) using SSH-connections. The spawning process assumes that the user can make a SSH-connection without login credentials to each node. The environment can then be used to wait until all the nodes are ready (have done their individual initialization) and do optional additional preparing of the nodes (e.g. adding inter-node connections between agents).
After all the nodes are ready and prepared, the environment can be used to run an iterative (asynchronous) simulation using
DistributedEnvironment.trigger_all()
which callstrigger_all()
for each node’s manager.Warning
To free the resources on each node, it is crucial to call
close()
after the simulation is finished. Otherwise, some rogue processes are likely to be left unattended on the external nodes.The intended order of usage is as follows:
ds = DistributedEnvironment(*args, **kwargs) run(ds.spawn_nodes(spawn_cmd)) timeout = 30 loop = asyncio.get_event_loop() task = ds.wait_nodes(timeout, check_ready=True) nodes_ready = loop.run_until_complete(task) if nodes_ready: # All nodes are ready so we can do additional preparation loop.run_until_complete(ds.prepare_nodes()) # Run the simulation for i in range(10): loop.run_until_complete(ds.trigger_all()) # Destroy the simulation afterwards to free the resources on each node. ds.close()
- Parameters
addr –
(HOST, PORT)
address for the env property (this node). Thehost
should not be present in nodes.env_cls – Class for the master environment, used to make connections to the slave environments. Must be a subclass of
Environment
.mgr_cls – Class for the master environment’s manager.
name (str) – Name of the environment. Will be shown in logs.
nodes – List of (server, port)-tuples which are used to make host the slave multi-environments. The port is the port for the SSH connection, default SSH-port is 22. See also
spawn_slaves()
.logger – Optional logger for this simulation.
-
add_artifact
(artifact)¶ Add an artifact to the environment.
- Parameters
artifact (object) – Artifact to be added.
-
add_artifacts
(artifacts)¶ Add artifacts to
artifacts
.- Parameters
artifacts – list of
Artifact
objects
-
check_ready
()¶ Check if this multi-environment itself is ready.
Override in subclass if it needs any additional (asynchronous) initialization other than spawning its slave environments.
- Return type
- Returns
This basic implementation returns always True.
-
close
(folder=None, as_coro=False)¶ Close the multiprocessing environment and its slave environments.
-
create_connections
(connection_map, as_coro=False)¶ Create agent connections from the given connection map.
- Parameters
connection_map (dict) – A map of connections to be created. Dictionary where keys are agent addresses and values are lists of (addr, data)-tuples suitable for
add_connections()
.as_coro (bool) – If
True
returns a coroutine, otherwise runs the asynchronous calls to the slave environment managers in the event loop.
Only the connections for the agents that are in the slave environments are created.
-
destroy
(folder=None, as_coro=False)¶ Close the multiprocessing environment and its slave environments.
Deprecated since version 0.4.0: Use
close()
instead
-
get_agents
(addr=True, agent_cls=None, as_coro=False)¶ Get agents from the slave environments.
- Parameters
- Returns
A coroutine or list of
Proxy
objects or addresses as specified by the input parameters.
Slave environment managers are excluded from the returned list by default. Essentially, this method calls each slave environment manager’s
get_agents()
asynchronously.Tip
Calling each slave environment’s manager might be costly in some situations. Therefore, it is advisable to store the returned agent list if the agent sets in the slave environments are not bound to change.
-
get_artifacts
(agent_name=None)¶ Get all artifacts or all artifacts published by a specific agent.
-
get_connections
(data=True, as_coro=False)¶ Return connections from all the agents in the slave environments.
-
get_slave_managers
(as_coro=False)[source]¶ Return all slave environment manager addresses.
- Parameters
as_coro (bool) – If
True
returns awaitable coroutine, otherwise runs the calls to the slave managers asynchronously in the event loop.
This method returns the addresses of the true slave environment managers, i.e. managers derived from
EnvManager
, not multi-environment managers. For example, if this node environment has two nodes with four slave environments in each, then this method returns 8 addresses.
-
async
is_ready
()¶ Check if the multi-environment has been fully initialized.
This calls each slave environment managers’
is_ready()
and checks if the multi-environment itself is ready by callingcheck_ready()
.See also
-
async
prepare_nodes
(*args, **kwargs)[source]¶ Prepare nodes (and slave environments and agents) so that they are ready. Should be called after
wait_nodes()
.Note
Override in the subclass for the intended functionality.
-
save_info
(folder, *args, **kwargs)¶ Save information accumulated during the environment’s lifetime.
Called from
destroy()
. Override in subclass.- Parameters
folder (str) – root folder to save information
-
async
set_host_manager
(addr, timeout=5)¶ Set this multi-environment’s manager as the host manager for a manager agent in addr
-
async
set_host_managers
(timeout=5)¶ Set the master environment’s manager as host manager for the slave environment managers.
- Parameters
timeout (int) – Timeout for connecting to the slave managers.
This enables the slave environment managers to communicate back to the master environment. The master environment manager,
manager
, must be an instance ofMultiEnvManager
or its subclass if this method is called.
-
async
spawn
(agent_cls, *args, addr=None, **kwargs)¶ Spawn a new agent in a slave environment.
- Parameters
- Returns
aiomas.rpc.Proxy
and address for the created agent.
The
*args
and**kwargs
are passed down to the agent’s__init__
.Tip
Use
spawn_n()
to spawn large number of agents with identical initialization parameters.
-
async
spawn_n
(agent_cls, n, *args, addr=None, **kwargs)¶ Same as
spawn()
, but allows spawning multiple agents with the same initialization parameters simultaneously into one slave environment.- Parameters
agent_cls (str) –
qualname
of the agent class. That is, the name should be in the form ofpkg.mod:cls
, e.g.creamas.core.agent:CreativeAgent
.n (int) – Number of agents to spawn
addr (str) – Optional. Address for the slave enviroment’s manager. If
None
, spawns the agents in the slave environment with currently smallest number of agents.
- Returns
A list of (
aiomas.rpc.Proxy
, address)-tuples for the spawned agents.
The
*args
and**kwargs
are passed down to each agent’s__init__
.
-
async
spawn_nodes
(spawn_cmd, ports=None, **ssh_kwargs)[source]¶ An alias for
creamas.ds.DistributedEnvironment.spawn_slaves()
.
-
async
spawn_slaves
(spawn_cmd, ports=None, **ssh_kwargs)[source]¶ Spawn multi-environments on the nodes through SSH-connections.
- Parameters
spawn_cmd – str or list, command(s) used to spawn the environment on each node. If list, it must contain one command for each node in
nodes
. If str, the same command is used for each node.ports – Optional. If not
None
, must be a mapping from nodes ((server, port)
-tuples) to ports which are used for the spawned multi-environments’ master manager environments. IfNone
, then the same port is used to derive the master manager addresses as was used to initialize this distributed environment’s managing environment (port inaddr
).ssh_kwargs – Any additional SSH-connection arguments, as specified by
asyncssh.connect()
. See asyncssh documentation for details.
Nodes are spawned by creating a multiprocessing pool where each node has its own subprocess. These subprocesses then use SSH-connections to spawn the multi-environments on the nodes. The SSH-connections in the pool are kept alive until the nodes are stopped, i.e. this distributed environment is destroyed.
-
async
stop_slaves
(timeout=1)¶ Stop all the slaves by sending a stop-message to their managers.
- Parameters
timeout (int) – Timeout for connecting to each manager. If a connection can not be made before the timeout expires, the resulting error for that particular manager is logged, but the stopping of other managers is not halted.
-
async
trigger_act
(addr)¶ Trigger agent in
addr
to act.This method is quite inefficient if used repeatedly for a large number of agents.
-
async
trigger_all
(*args, **kwargs)¶ Trigger all agents in all the slave environments to
act()
asynchronously.Given arguments and keyword arguments are passed down to each agent’s
act()
.Note
By design, the manager agents in each slave environment, i.e.
manager
, are excluded from acting.
-
async
wait_nodes
(timeout, check_ready=True)[source]¶ Wait until all nodes are online (their managers accept connections) or timeout expires. Should be called after
spawn_nodes()
.This is an alias for
wait_slaves()
.
-
async
wait_slaves
(timeout, check_ready=False)¶ Wait until all slaves are online (their managers accept connections) or timeout expires.
- Parameters
timeout (int) – Timeout (in seconds) after which the method will return even though all the slaves are not online yet.
check_ready (bool) – If
True
also checks if all slave environment’s are ready. A slave environment is assumed to be ready when its manager’sis_ready()
-method returnsTrue
.
-
property
addrs
¶ Addresses of the node managers.
These addresses are derived from nodes and ports parameters given in
spawn_slaves()
, and are used to communicate tasks (trigger agents) to the nodes. Each manager is assumed to be the first agent in its own managed environment.
-
property
artifacts
¶ Published artifacts for all agents.
-
property
env
¶ The environment hosting the manager of this multi-environment.
This environment is also used without the manager to connect to the slave environment managers and communicate with them.
-
property
logger
¶ Logger for the environment.
Logger should have at least
log()
method which takes two arguments: a log level and a message.
-
property
manager
¶ This multi-environment’s master manager.
-
property
name
¶ The name of the environment.
-
property
nodes
¶ Environment nodes (excluding the current host).
Altering the nodes after the initialization most probably causes unexpected behavior.
-
async
creamas.ds.
run_node
(menv, log_folder)[source]¶ Run
MultiEnvironment
until its manager’sstop()
is called.- Parameters
menv –
MultiEnvironment
to wait for.log_folder (str) – Logging folder to be passed down to
destroy()
afterstop()
is called.
This method will block the current thread until the manager’s
stop()
is called. After the stop-message is received, multi-environment is destroyed.The method is intended to be used in
DistributedEnvironment
scripts which spawn multi-environments on different nodes. That is, using this function in the script will block the script’s further execution until the simulation has run its course and the nodes need to be closed. Callingclose()
will automatically call each node manager’sstop()
and therefore release the script.
-
async
creamas.ds.
ssh_exec
(server, cmd, timeout=10, **ssh_kwargs)[source]¶ Execute a command on a given server using asynchronous SSH-connection.
The connection to the server is wrapped in
asyncio.wait_for()
and giventimeout
is applied to it. If the server is not reachable before timeout expires,asyncio.TimeoutError
is raised.- Parameters
server (str) – Address of the server
cmd (str) – Command to be executed
timeout (int) – Timeout to connect to server.
ssh_kwargs –
Any additional SSH-connection arguments, as specified by
asyncssh.connect()
. See asyncssh documentation for details.
- Returns
closed SSH-connection
-
creamas.ds.
ssh_exec_in_new_loop
(server, cmd, timeout=10, **ssh_kwargs)[source]¶ Same as
ssh_exec()
but creates a new event loop and executesssh_exec()
in that event loop.