Utilities

Miscellaneous utility functions.

creamas.util.addrs2managers(addrs)[source]

Map agent addresses to their assumed managers.

creamas.util.create_tasks(task_coro, addrs, *args, flatten=True, **kwargs)[source]

Create and schedule a set of asynchronous tasks.

The function creates the tasks using a given list of agent addresses and wraps each of them in asyncio.ensure_future(). The *args and **kwargs are passed down to task_coro() when creating tasks for each address in addrs.

Usage example for a method in a class derived from MultiEnvironment:

async def my_method(self, *args, **kwargs):
    async def task(addr, *args, **kwargs):
        r_manager = await self.env.connect(addr)
        return await r_manager.my_method(*args, **kwargs)

    return await util.create_tasks(task, self.addrs, *args, **kwargs)
Parameters
  • task_coro – Coroutine which is used for each address in addrs. The coroutine should accept an agent address as the first parameter.

  • addrs (list) – A list of agent addresses used as the first parameters of task_coro().

  • flatten (bool) – If True the returned results are flattened into one list if the tasks return iterable objects. The parameter does nothing if all the results are not iterable.

Returns

An awaitable coroutine which returns the results of tasks as a list or as a flattened list

creamas.util.expose(*args, **kwargs)[source]

Function which returns aiomas.expose() wrapper.

Used by agents to indicate which functions should be callable by other agents.

creamas.util.get_manager(addr)[source]

Get assumed environment manager’s address for a given agent address.

creamas.util.run(task=None, loop=None)[source]

Run the event loop forever or until the task/future task is finished.

Parameters
  • task – Optional. Task or Future which is run until complete. If parameter is None runs the event loop forever.

  • loop – Optional. Event loop to use. If the parameter is None uses asyncio’s base event loop.

Note

This method has the same intent as aiomas.util.run().

creamas.util.run_or_coro(task, as_coro, loop=None)[source]

A shorthand to run the task/future or return it as is.

Parameters
  • task – Optional. Task or Future which is run until complete. If parameter is None runs the event loop forever.

  • as_coro (bool) – If True returns the given task as is, otherwise runs it in the event loop.

  • loop – Optional. Event loop to use. If the parameter is None uses asyncio’s base event loop.

creamas.util.sanitize_agent_name(name)[source]

Get sanitized name of the agent, used for file and directory creation.

creamas.util.sort_addrs(addrs)[source]

Return agent addresses in a sorted order.

Agent addresses are sorted with following hierarchical criteria:
  1. by the host of an agent’s environment

  2. by the port (interpreted as an integer) of an agent’s environment

  3. by the order in which the agents were created in their environment

For example, the following list of addresses:

['tcp://bnode:5555/0',
 'tcp://anode:5555/0',
 'tcp://anode:50/1',
 'tcp://anode:5555/2',
 'tcp://anode:50/2',
 'tcp://anode:18000/0',
 'tcp://bnode:50/0',
 'tcp://bnode:18000/0',
 'tcp://anode:18000/1',
 'tcp://anode:18000/2',
 'tcp://bnode:50/1',
 'tcp://bnode:5555/2',
 'tcp://bnode:5555/1',
 'tcp://bnode:50/2',
 'tcp://bnode:18000/2',
 'tcp://anode:50/0',
 'tcp://bnode:18000/1',
 'tcp://anode:5555/1']

would be sorted into the following order:

['tcp://anode:50/0',
 'tcp://anode:50/1',
 'tcp://anode:50/2',
 'tcp://anode:5555/0',
 'tcp://anode:5555/1',
 'tcp://anode:5555/2',
 'tcp://anode:18000/0',
 'tcp://anode:18000/1',
 'tcp://anode:18000/2',
 'tcp://bnode:50/0',
 'tcp://bnode:50/1',
 'tcp://bnode:50/2',
 'tcp://bnode:5555/0',
 'tcp://bnode:5555/1',
 'tcp://bnode:5555/2',
 'tcp://bnode:18000/0',
 'tcp://bnode:18000/1',
 'tcp://bnode:18000/2']
Parameters

addrs (list) – List of addresses to be sorted.

Returns

List of addresses in a sorted order.

creamas.util.split_addrs(addrs)[source]

Split addresses into dictionaries by hosts and ports.

Parameters

addrs (list) – A list of addresses.

Returns

A dictionary of dictionaries, where dict[HOST][PORT] holds a list of all agent addresses in that environment.

async creamas.util.wait_tasks(tasks, flatten=True)[source]

Gather a list of asynchronous tasks and wait for their completion.

Parameters
  • tasks (list) – A list of asyncio tasks wrapped in asyncio.ensure_future().

  • flatten (bool) – If True the returned results are flattened into one list if the tasks return iterable objects. The parameter does nothing if all the results are not iterable.

Returns

The results of tasks as a list or as a flattened list