"""
.. py:module:: ds
:platform: Unix
The module holds a base implementation, :py:class:`DistributedEnvironment`,
for simulations and environments where the resources span over multiple nodes
on computing clusters or other distributed systems.
.. note::
The module needs extras to be installed using ``pip install creamas[extras]``.
"""
import asyncio
import logging
import multiprocessing
import asyncssh
from creamas.mp import MultiEnvironment
from creamas.util import create_tasks, run_or_coro
logger = logging.getLogger(__name__)
[docs]async def ssh_exec(server, cmd, timeout=10, **ssh_kwargs):
"""Execute a command on a given server using asynchronous SSH-connection.
The connection to the server is wrapped in :func:`asyncio.wait_for` and
given :attr:`timeout` is applied to it. If the server is not reachable
before timeout expires, :exc:`asyncio.TimeoutError` is raised.
:param str server: Address of the server
:param str cmd: Command to be executed
:param int timeout: Timeout to connect to server.
:param ssh_kwargs:
Any additional SSH-connection arguments, as specified by
:func:`asyncssh.connect`. See `asyncssh documentation
<http://asyncssh.readthedocs.io/en/latest/api.html#connect>`_ for
details.
:returns:
closed SSH-connection
"""
conn = await asyncio.wait_for(asyncssh.connect(server, **ssh_kwargs),
timeout=timeout)
ret = await conn.run(cmd)
conn.close()
return ret
[docs]def ssh_exec_in_new_loop(server, cmd, timeout=10, **ssh_kwargs):
"""Same as :func:`ssh_exec` but creates a new event loop and executes
:func:`ssh_exec` in that event loop.
"""
task = ssh_exec(server, cmd, timeout=timeout, **ssh_kwargs)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop.run_until_complete(task)
[docs]async def run_node(menv, log_folder):
"""Run :class:`~creamas.mp.MultiEnvironment` until its manager's
:meth:`~aiomas.subproc.Manager.stop` is called.
:param menv: :class:`~creamas.mp.MultiEnvironment` to wait for.
:param str log_folder:
Logging folder to be passed down to
:meth:`~creamas.mp.MultiEnvironment.destroy` after :meth:`stop` is
called.
This method will block the current thread until the manager's
:meth:`~creamas.mp.MultiEnvManager.stop` is called. After the stop-message
is received, multi-environment is destroyed.
The method is intended to be
used in :class:`~creamas.ds.DistributedEnvironment` scripts which spawn
multi-environments on different nodes. That is, using this function in the
script will block the script's further execution until the simulation has
run its course and the nodes need to be closed.
Calling :meth:`~creamas.ds.DistributedEnvironment.close` will
automatically call each node manager's :meth:`stop` and therefore release
the script.
"""
try:
await menv.manager.stop_received
except KeyboardInterrupt:
pass
finally:
ret = await menv.close(log_folder, as_coro=True)
return ret
[docs]class DistributedEnvironment(MultiEnvironment):
"""Distributed environment which manages several nodes containing
multi-environments, a subclass of :class:`~creamas.mp.MultiEnvironment`.
This environment spawns its slave multi-environments on the different
servers (nodes) using SSH-connections. The spawning process assumes that
the user can make a SSH-connection without login credentials to each
node. The environment can then be used to wait until all the nodes are
ready (have done their individual initialization) and do optional
additional preparing of the nodes (e.g. adding inter-node connections
between agents).
After all the nodes are ready and prepared, the environment can be used
to run an iterative (asynchronous) simulation using
:meth:`DistributedEnvironment.trigger_all` which calls
:meth:`~creamas.mp.MultiEnvManager.trigger_all` for each node's manager.
.. warning::
To free the resources on each node, it is crucial to call
:meth:`~creamas.ds.DistributedEnvironment.close` after the simulation
is finished. Otherwise, some rogue processes are likely to be left
unattended on the external nodes.
The intended order of usage is as follows::
ds = DistributedEnvironment(*args, **kwargs)
run(ds.spawn_nodes(spawn_cmd))
timeout = 30
loop = asyncio.get_event_loop()
task = ds.wait_nodes(timeout, check_ready=True)
nodes_ready = loop.run_until_complete(task)
if nodes_ready:
# All nodes are ready so we can do additional preparation
loop.run_until_complete(ds.prepare_nodes())
# Run the simulation
for i in range(10):
loop.run_until_complete(ds.trigger_all())
# Destroy the simulation afterwards to free the resources on each node.
ds.close()
"""
def __init__(self, addr, env_cls, nodes, mgr_cls=None, name=None,
logger=None, **env_kwargs):
"""
:param addr:
``(HOST, PORT)`` address for the *env* property (this node). The
``host`` should not be present in *nodes*.
:param env_cls:
Class for the master environment, used to make connections to the
slave environments. Must be a subclass of
:py:class:`~creamas.core.environment.Environment`.
:param mgr_cls:
Class for the master environment's manager.
:param str name: Name of the environment. Will be shown in logs.
:param nodes:
List of (server, port)-tuples which are used to make host the slave
multi-environments. The port is the port for the SSH connection,
default SSH-port is 22. See also
:meth:`~creamas.ds.DistributedEnvironment.spawn_slaves`.
:param logger:
Optional logger for this simulation.
"""
super().__init__(addr, env_cls, mgr_cls=mgr_cls, name=name,
logger=logger, **env_kwargs)
self._nodes = nodes
self.port = addr[1]
self.addr = addr
@property
def nodes(self):
"""Environment nodes (excluding the current host).
Altering the nodes after the initialization most probably causes
unexpected behavior.
"""
return self._nodes
@property
def addrs(self):
"""Addresses of the node managers.
These addresses are derived from *nodes* and *ports* parameters
given in :meth:`spawn_slaves`, and are used to communicate tasks
(trigger agents) to the nodes. Each manager is assumed to be the
first agent in its own managed environment.
"""
return self._manager_addrs
[docs] async def wait_nodes(self, timeout, check_ready=True):
"""Wait until all nodes are online (their managers accept connections)
or timeout expires. Should be called after :meth:`spawn_nodes`.
This is an alias for :meth:`~creamas.mp.MultiEnvironment.wait_slaves`.
"""
return await self.wait_slaves(timeout, check_ready=check_ready)
[docs] async def spawn_nodes(self, spawn_cmd, ports=None, **ssh_kwargs):
"""An alias for :meth:`creamas.ds.DistributedEnvironment.spawn_slaves`.
"""
return await self.spawn_slaves(spawn_cmd, ports=ports, **ssh_kwargs)
[docs] async def spawn_slaves(self, spawn_cmd, ports=None, **ssh_kwargs):
"""Spawn multi-environments on the nodes through SSH-connections.
:param spawn_cmd:
str or list, command(s) used to spawn the environment on each node.
If *list*, it must contain one command for each node in
:attr:`nodes`. If *str*, the same command is used for each node.
:param ports:
Optional. If not ``None``, must be a mapping from nodes
(``(server, port)``-tuples) to ports which are used for the spawned
multi-environments' master manager environments. If ``None``, then
the same port is used to derive the master manager addresses as was
used to initialize this distributed environment's managing
environment (port in :attr:`addr`).
:param ssh_kwargs:
Any additional SSH-connection arguments, as specified by
:meth:`asyncssh.connect`. See `asyncssh documentation
<http://asyncssh.readthedocs.io/en/latest/api.html#connect>`_ for
details.
Nodes are spawned by creating a multiprocessing pool where each node
has its own subprocess. These subprocesses then use SSH-connections
to spawn the multi-environments on the nodes. The SSH-connections in
the pool are kept alive until the nodes are stopped, i.e. this
distributed environment is destroyed.
"""
pool = multiprocessing.Pool(len(self.nodes))
rets = []
for i, node in enumerate(self.nodes):
server, server_port = node
port = ports[node] if ports is not None else self.port
mgr_addr = "tcp://{}:{}/0".format(server, port)
self._manager_addrs.append(mgr_addr)
if type(spawn_cmd) in [list, tuple]:
cmd = spawn_cmd[i]
else:
cmd = spawn_cmd
args = [server, cmd]
ssh_kwargs_cp = ssh_kwargs.copy()
ssh_kwargs_cp['port'] = server_port
ret = pool.apply_async(ssh_exec_in_new_loop,
args=args,
kwds=ssh_kwargs_cp,
error_callback=logger.warning)
rets.append(ret)
self._pool = pool
self._r = rets
[docs] async def prepare_nodes(self, *args, **kwargs):
"""Prepare nodes (and slave environments and agents) so that they are
ready. Should be called after :meth:`wait_nodes`.
.. note::
Override in the subclass for the intended functionality.
"""
raise NotImplementedError()
[docs] def get_slave_managers(self, as_coro=False):
"""Return all slave environment manager addresses.
:param bool as_coro:
If ``True`` returns awaitable coroutine, otherwise runs the calls
to the slave managers asynchronously in the event loop.
This method returns the addresses of the true slave environment
managers, i.e. managers derived from :class:`~creamas.mp.EnvManager`,
not multi-environment managers. For example, if this node environment
has two nodes with four slave environments in each, then this method
returns 8 addresses.
"""
async def slave_task(addr):
r_manager = await self.env.connect(addr)
return await r_manager.get_slave_managers()
tasks = create_tasks(slave_task, self.addrs)
return run_or_coro(tasks, as_coro)