Distributed Systems

The module holds a base implementation, DistributedEnvironment, for simulations and environments where the resources span over multiple nodes on computing clusters or other distributed systems.

Note

The module needs extras to be installed using pip install creamas[extras].

class creamas.ds.DistributedEnvironment(addr, env_cls, nodes, mgr_cls=None, name=None, logger=None, **env_kwargs)[source]

Distributed environment which manages several nodes containing multi-environments, a subclass of MultiEnvironment.

This environment spawns its slave multi-environments on the different servers (nodes) using SSH-connections. The spawning process assumes that the user can make a SSH-connection without login credentials to each node. The environment can then be used to wait until all the nodes are ready (have done their individual initialization) and do optional additional preparing of the nodes (e.g. adding inter-node connections between agents).

After all the nodes are ready and prepared, the environment can be used to run an iterative (asynchronous) simulation using DistributedEnvironment.trigger_all() which calls trigger_all() for each node’s manager.

Warning

To free the resources on each node, it is crucial to call close() after the simulation is finished. Otherwise, some rogue processes are likely to be left unattended on the external nodes.

The intended order of usage is as follows:

ds = DistributedEnvironment(*args, **kwargs)
run(ds.spawn_nodes(spawn_cmd))
timeout = 30
loop = asyncio.get_event_loop()
task = ds.wait_nodes(timeout, check_ready=True)
nodes_ready = loop.run_until_complete(task)
if nodes_ready:
    # All nodes are ready so we can do additional preparation
    loop.run_until_complete(ds.prepare_nodes())
    # Run the simulation
    for i in range(10):
        loop.run_until_complete(ds.trigger_all())

# Destroy the simulation afterwards to free the resources on each node.
ds.close()
Parameters
  • addr(HOST, PORT) address for the env property (this node). The host should not be present in nodes.

  • env_cls – Class for the master environment, used to make connections to the slave environments. Must be a subclass of Environment.

  • mgr_cls – Class for the master environment’s manager.

  • name (str) – Name of the environment. Will be shown in logs.

  • nodes – List of (server, port)-tuples which are used to make host the slave multi-environments. The port is the port for the SSH connection, default SSH-port is 22. See also spawn_slaves().

  • logger – Optional logger for this simulation.

add_artifact(artifact)

Add an artifact to the environment.

Parameters

artifact (object) – Artifact to be added.

add_artifacts(artifacts)

Add artifacts to artifacts.

Parameters

artifacts – list of Artifact objects

check_ready()

Check if this multi-environment itself is ready.

Override in subclass if it needs any additional (asynchronous) initialization other than spawning its slave environments.

Return type

bool

Returns

This basic implementation returns always True.

close(folder=None, as_coro=False)

Close the multiprocessing environment and its slave environments.

async connect(*args, **kwargs)

A shortcut to environment’s connect().

create_connections(connection_map, as_coro=False)

Create agent connections from the given connection map.

Parameters
  • connection_map (dict) – A map of connections to be created. Dictionary where keys are agent addresses and values are lists of (addr, data)-tuples suitable for add_connections().

  • as_coro (bool) – If True returns a coroutine, otherwise runs the asynchronous calls to the slave environment managers in the event loop.

Only the connections for the agents that are in the slave environments are created.

destroy(folder=None, as_coro=False)

Close the multiprocessing environment and its slave environments.

Deprecated since version 0.4.0: Use close() instead

get_agents(addr=True, agent_cls=None, as_coro=False)

Get agents from the slave environments.

Parameters
  • addr (bool) – If True, returns only addresses of the agents, otherwise returns a Proxy object for each agent.

  • agent_cls – If specified, returns only agents that are members of that particular class.

  • as_coro (bool) – If True, returns a coroutine, otherwise runs the method in an event loop.

Returns

A coroutine or list of Proxy objects or addresses as specified by the input parameters.

Slave environment managers are excluded from the returned list by default. Essentially, this method calls each slave environment manager’s get_agents() asynchronously.

Tip

Calling each slave environment’s manager might be costly in some situations. Therefore, it is advisable to store the returned agent list if the agent sets in the slave environments are not bound to change.

get_artifacts(agent_name=None)

Get all artifacts or all artifacts published by a specific agent.

Parameters

agent_name (str) – Optional. Name of the agent which artifacts are returned.

Returns

All artifacts or all artifacts published by the agent.

Return type

list

get_connections(data=True, as_coro=False)

Return connections from all the agents in the slave environments.

Parameters
  • data (bool) – If True, returns also the data stored for each connection.

  • as_coro (bool) – If True returns a coroutine, otherwise runs the asynchronous calls to the slave environment managers in the event loop.

get_slave_managers(as_coro=False)[source]

Return all slave environment manager addresses.

Parameters

as_coro (bool) – If True returns awaitable coroutine, otherwise runs the calls to the slave managers asynchronously in the event loop.

This method returns the addresses of the true slave environment managers, i.e. managers derived from EnvManager, not multi-environment managers. For example, if this node environment has two nodes with four slave environments in each, then this method returns 8 addresses.

async is_ready()

Check if the multi-environment has been fully initialized.

This calls each slave environment managers’ is_ready() and checks if the multi-environment itself is ready by calling check_ready().

See also

is_ready()

async prepare_nodes(*args, **kwargs)[source]

Prepare nodes (and slave environments and agents) so that they are ready. Should be called after wait_nodes().

Note

Override in the subclass for the intended functionality.

save_info(folder, *args, **kwargs)

Save information accumulated during the environment’s lifetime.

Called from destroy(). Override in subclass.

Parameters

folder (str) – root folder to save information

async set_host_manager(addr, timeout=5)

Set this multi-environment’s manager as the host manager for a manager agent in addr

async set_host_managers(timeout=5)

Set the master environment’s manager as host manager for the slave environment managers.

Parameters

timeout (int) – Timeout for connecting to the slave managers.

This enables the slave environment managers to communicate back to the master environment. The master environment manager, manager, must be an instance of MultiEnvManager or its subclass if this method is called.

async spawn(agent_cls, *args, addr=None, **kwargs)

Spawn a new agent in a slave environment.

Parameters
  • agent_cls (str) – qualname of the agent class. That is, the name should be in the form pkg.mod:cls, e.g. creamas.core.agent:CreativeAgent.

  • addr (str) – Optional. Address for the slave enviroment’s manager. If None, spawns the agent in the slave environment with currently smallest number of agents.

Returns

aiomas.rpc.Proxy and address for the created agent.

The *args and **kwargs are passed down to the agent’s __init__.

Tip

Use spawn_n() to spawn large number of agents with identical initialization parameters.

async spawn_n(agent_cls, n, *args, addr=None, **kwargs)

Same as spawn(), but allows spawning multiple agents with the same initialization parameters simultaneously into one slave environment.

Parameters
  • agent_cls (str) – qualname of the agent class. That is, the name should be in the form of pkg.mod:cls, e.g. creamas.core.agent:CreativeAgent.

  • n (int) – Number of agents to spawn

  • addr (str) – Optional. Address for the slave enviroment’s manager. If None, spawns the agents in the slave environment with currently smallest number of agents.

Returns

A list of (aiomas.rpc.Proxy, address)-tuples for the spawned agents.

The *args and **kwargs are passed down to each agent’s __init__.

async spawn_nodes(spawn_cmd, ports=None, **ssh_kwargs)[source]

An alias for creamas.ds.DistributedEnvironment.spawn_slaves().

async spawn_slaves(spawn_cmd, ports=None, **ssh_kwargs)[source]

Spawn multi-environments on the nodes through SSH-connections.

Parameters
  • spawn_cmd – str or list, command(s) used to spawn the environment on each node. If list, it must contain one command for each node in nodes. If str, the same command is used for each node.

  • ports – Optional. If not None, must be a mapping from nodes ((server, port)-tuples) to ports which are used for the spawned multi-environments’ master manager environments. If None, then the same port is used to derive the master manager addresses as was used to initialize this distributed environment’s managing environment (port in addr).

  • ssh_kwargs – Any additional SSH-connection arguments, as specified by asyncssh.connect(). See asyncssh documentation for details.

Nodes are spawned by creating a multiprocessing pool where each node has its own subprocess. These subprocesses then use SSH-connections to spawn the multi-environments on the nodes. The SSH-connections in the pool are kept alive until the nodes are stopped, i.e. this distributed environment is destroyed.

async stop_slaves(timeout=1)

Stop all the slaves by sending a stop-message to their managers.

Parameters

timeout (int) – Timeout for connecting to each manager. If a connection can not be made before the timeout expires, the resulting error for that particular manager is logged, but the stopping of other managers is not halted.

async trigger_act(addr)

Trigger agent in addr to act.

This method is quite inefficient if used repeatedly for a large number of agents.

async trigger_all(*args, **kwargs)

Trigger all agents in all the slave environments to act() asynchronously.

Given arguments and keyword arguments are passed down to each agent’s act().

Note

By design, the manager agents in each slave environment, i.e. manager, are excluded from acting.

async wait_nodes(timeout, check_ready=True)[source]

Wait until all nodes are online (their managers accept connections) or timeout expires. Should be called after spawn_nodes().

This is an alias for wait_slaves().

async wait_slaves(timeout, check_ready=False)

Wait until all slaves are online (their managers accept connections) or timeout expires.

Parameters
  • timeout (int) – Timeout (in seconds) after which the method will return even though all the slaves are not online yet.

  • check_ready (bool) – If True also checks if all slave environment’s are ready. A slave environment is assumed to be ready when its manager’s is_ready()-method returns True.

property addrs

Addresses of the node managers.

These addresses are derived from nodes and ports parameters given in spawn_slaves(), and are used to communicate tasks (trigger agents) to the nodes. Each manager is assumed to be the first agent in its own managed environment.

property artifacts

Published artifacts for all agents.

property env

The environment hosting the manager of this multi-environment.

This environment is also used without the manager to connect to the slave environment managers and communicate with them.

property logger

Logger for the environment.

Logger should have at least log() method which takes two arguments: a log level and a message.

property manager

This multi-environment’s master manager.

property name

The name of the environment.

property nodes

Environment nodes (excluding the current host).

Altering the nodes after the initialization most probably causes unexpected behavior.

async creamas.ds.run_node(menv, log_folder)[source]

Run MultiEnvironment until its manager’s stop() is called.

Parameters

This method will block the current thread until the manager’s stop() is called. After the stop-message is received, multi-environment is destroyed.

The method is intended to be used in DistributedEnvironment scripts which spawn multi-environments on different nodes. That is, using this function in the script will block the script’s further execution until the simulation has run its course and the nodes need to be closed. Calling close() will automatically call each node manager’s stop() and therefore release the script.

async creamas.ds.ssh_exec(server, cmd, timeout=10, **ssh_kwargs)[source]

Execute a command on a given server using asynchronous SSH-connection.

The connection to the server is wrapped in asyncio.wait_for() and given timeout is applied to it. If the server is not reachable before timeout expires, asyncio.TimeoutError is raised.

Parameters
  • server (str) – Address of the server

  • cmd (str) – Command to be executed

  • timeout (int) – Timeout to connect to server.

  • ssh_kwargs

    Any additional SSH-connection arguments, as specified by asyncssh.connect(). See asyncssh documentation for details.

Returns

closed SSH-connection

creamas.ds.ssh_exec_in_new_loop(server, cmd, timeout=10, **ssh_kwargs)[source]

Same as ssh_exec() but creates a new event loop and executes ssh_exec() in that event loop.