Source code for creamas.grid

"""
.. py:module:: grid
    :platform: Unix

The module holds implementations for 2D-grid environments where the agents
know their neighbors in each of the four cardinal directions. Currently
implemented are:

    * :class:`~creamas.grid.GridAgent`: The base grid agent implementation,
      should be used inside :class:`~creamas.grid.GridEnvironment`. Agent
      places itself in the environment's
      :attr:`~creamas.grid.GridEnvironment.grid` and knows its neighbors
      in the cardinal directions: N, E, S, W. Subclass the agent for your
      specific needs.

    * :class:`~creamas.grid.GridEnvironment`: A single process grid
      environment.

    * :class:`~creamas.grid.GridEnvManager`: A manager for a single process
      grid environment.

    * :class:`~creamas.grid.GridMultiEnvironment`: Multi-processing environment
      holding several :class:`~creamas.grid.GridEnvironment` slaves with
      managers.

    * :class:`~creamas.grid.GridMultiEnvManager`: A manager for a
      multi-processing environment. Used especially if the environment needs to
      be able to execute commands from external sources, e.g. when used as a
      part of :class:`creamas.ds.DistributedEnvironment`.
"""
import asyncio
import logging
import traceback

from creamas import CreativeAgent, Environment
from creamas.mp import MultiEnvironment, MultiEnvManager, EnvManager
from creamas.util import expose

# Relative xy coordinates for the cardinal directions
_rel_xy = {'N': (0, -1), 'E': (1, 0), 'S': (0, 1), 'W': (-1, 0)}
_polars = {'N': 'S', 'E': 'W', 'S': 'N', 'W': 'E'}


def _get_neighbor_xy(card, xy):
    rxy = _rel_xy[card]
    return xy[0] + rxy[0], xy[1] + rxy[1]


[docs]class GridAgent(CreativeAgent): """An agent living in a 2D-grid with four neighbors in cardinal directions. The agent assumes that its environment is derived from :class:`~creamas.grid.GridEnvironment`, and places itself into the grid when it is initialized. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._xy = self.env.add_to_grid(agent=self) self.name = "{}-{}({})".format(self._xy[0], self._xy[1], self.addr) self._neighbors = {'N': None, 'E': None, 'S': None, 'W': None} @property def xy(self): """Agent's place (coordinate) in the grid, (x, y)-tuple. """ return self._xy @property def neighbors(self): """Map of neighboring agent addresses in cardinal directions: N, E, S, W. """ return self._neighbors
[docs] async def send(self, card, msg): """Send message *msg* to the neighboring agent in the *card* cardinal direction. :param str card: 'N', 'E', 'S', or 'W'. :param msg: Message to the agent. :returns: Response from the agent The method calls the neighboring agent's :meth:`rcv` with the message and returns any value returned by that agent. This method will fail silently if there is no neighbor agent in the given cardinal direction. """ addr = self.neighbors[card] if addr is None: return None try: r_agent = await self.env.connect(addr, timeout=10) return await r_agent.rcv(msg) except: self._log(logging.WARNING, "Could not connect to agent in {}:\n{}" .format(addr, traceback.format_exc())) return None
[docs] @expose async def rcv(self, msg): """Receive and handle message coming from another agent. This method is called from :meth:`send`. The base implementation does nothing, override in a subclass. """ pass
[docs] @expose async def act(self, *args, **kwargs): """See :py:class:`creamas.core.agent.CreativeAgent.act`. """ pass
[docs]class GridEnvironment(Environment): """Environment where agents reside in a 2D-grid. Each agent is connected to neighbors in cardinal directions: N, E, S, W. Grid environments can be horizontally stacked with :py:class:`GridMultiEnvironment`. """ def __init__(self, base_url, loop, clock, connect_kwargs): super().__init__(base_url, loop, clock, connect_kwargs) self._gs = (0, 0) self._grid = [] self._origin = (0, 0) self._neighbors = {'N': None, 'E': None, 'S': None, 'W': None} @property def gs(self): """Size of the grid as a 2-tuple. Changing the size of the grid after spawning any agents in the environment will clear the grid, but does not remove the agents from the environment. """ return self._gs @gs.setter def gs(self, gs): self._gs = gs self._grid = [[None for _ in range(self._gs[1])] for _ in range(self._gs[0])] @property def origin(self): """Upper left corner of the grid, [0,0] by default. You should define the origin before spawning any agents into the environment. """ return self._origin @origin.setter def origin(self, origin): self._origin = origin @property def grid(self): """The agents in the grid. 2D-list with the same size as **gs**. """ return self._grid @property def neighbors(self): """Map of neighboring grid environments in cardinal directions. Acceptable keys: N, E, S, W. The values are **the addresses of the managers** in the neighboring grid environments. """ return self._neighbors
[docs] def is_ready(self): """Grid environment is ready when its grid is full. .. seealso:: :meth:`GridEnvironment.is_full`, :meth:`Environment.is_ready` """ return self.is_full()
[docs] def is_full(self): """:class:`GridEnvironment` is full when its **grid** is fully populated with agents. :returns: True if the grid is full, False otherwise. Will also return False for uninitialized grids with (0,0) grid size. """ if len(self.grid) == 0: return False for i in range(len(self.grid)): for j in range(len(self.grid[0])): if self.grid[i][j] is None: return False return True
[docs] def add_to_grid(self, agent): """Add agent to the next available spot in the grid. :returns: (x,y) of the agent in the grid. This is the agent's overall coordinate in the grand grid (i.e. the actual coordinate of the agent w.t.r **origin**). :raises: `ValueError` if the grid is full. """ for i in range(len(self.grid)): for j in range(len(self.grid[0])): if self.grid[i][j] is None: x = self.origin[0] + i y = self.origin[1] + j self.grid[i][j] = agent return (x, y) raise ValueError("Trying to add an agent to a full grid." .format(len(self._grid[0]), len(self._grid[1])))
[docs] def get_xy(self, xy, addr=True): """Get the agent with xy-coordinate in the grid. If *addr* is True, returns only the agent's address. If no such agent in the grid, returns None. :raises: :exc:`ValueError` if xy-coordinate is outside the environment's grid. """ x = xy[0] y = xy[1] if x < self.origin[0] or x >= self.origin[0] + self.gs[0]: raise ValueError("x-coordinate inappropriate ({})".format(x)) if y < self.origin[1] or y >= self.origin[1] + self.gs[1]: raise ValueError("y-coordinate inappropriate ({})".format(y)) i = x - self.origin[0] j = y - self.origin[1] if addr: return self.grid[i][j].addr return self.grid[i][j]
async def _get_xy_address_from_neighbor(self, card, xy): if self.neighbors[card] is None: return None r_manager = await self.connect(self.neighbors[card]) addr = await r_manager.get_xy_address(xy) return addr
[docs] async def set_agent_neighbors(self): """Set neighbors for each agent in each cardinal direction. This method assumes that the neighboring :class:`GridEnvironment` of this grid environment have already been set. """ for i in range(len(self.grid)): for j in range(len(self.grid[0])): agent = self.grid[i][j] xy = (self.origin[0] + i, self.origin[1] + j) nxy = _get_neighbor_xy('N', xy) exy = _get_neighbor_xy('E', xy) sxy = _get_neighbor_xy('S', xy) wxy = _get_neighbor_xy('W', xy) if j == 0: naddr = await self._get_xy_address_from_neighbor('N', nxy) else: naddr = self.get_xy(nxy, addr=True) if i == 0: waddr = await self._get_xy_address_from_neighbor('W', wxy) else: waddr = self.get_xy(wxy, addr=True) if j == len(self.grid[0]) - 1: saddr = await self._get_xy_address_from_neighbor('S', sxy) else: saddr = self.get_xy(sxy, addr=True) if i == len(self.grid) - 1: eaddr = await self._get_xy_address_from_neighbor('E', exy) else: eaddr = self.get_xy(exy, addr=True) agent.neighbors['N'] = naddr agent.neighbors['E'] = eaddr agent.neighbors['S'] = saddr agent.neighbors['W'] = waddr
[docs]class GridEnvManager(EnvManager): """Manager for :py:class:`GridEnvironment`. """
[docs] @expose async def spawn_n(self, agent_cls, n, *args, **kwargs): """Spawn *n* agents to the managed environment. This is a convenience function so that one does not have to repeatedly make connections to the environment to spawn multiple agents with the same parameters. See :py:meth:`~creamas.mp.EnvManager.spawn` for details. """ rets = [] for _ in range(n): ret = await self.spawn(agent_cls, *args, **kwargs) rets.append(ret) return rets
[docs] @expose def get_xy_address(self, xy): """Get address of the agent in *xy* coordinate, or None if no such agent exists. """ return self.env.get_xy(xy, addr=True)
[docs] @expose def set_origin(self, origin): """Set originating coordinates for the managed environment. """ self.env.origin = origin
@expose def get_origin(self): return self.env.origin
[docs] @expose def set_gs(self, gs): """Set grid size for the managed environment. """ self.env.gs = gs
@expose def get_gs(self): return self.env.gs
[docs] @expose async def set_grid_neighbor(self, card, addr): """Set the neighbor grid for the grid in *card* cardinal direction. The *addr* should point to thanager* of the neighboring grid. """ self.env.neighbors[card] = addr
[docs] @expose async def set_agent_neighbors(self): """Set neighboring agents for all the agents in the grid. If the managed grid contains neighboring grids, uses those to correctly set also neighboring agents for agents on the edge of the grid. This function assumes that: * Grid is full, i.e. it has maximum number of agents. * All the (possible) neighboring grids have been initialized and have the maximum number of agents. That is, if managed grid's neighbor map still points to ``None``, this grid is assumed to be in the edge of the super-grid containing multiple :py:class:`GridEnvironment` instances. """ await self.env.set_agent_neighbors()
[docs]class GridMultiEnvironment(MultiEnvironment): """Multi-environment which stacks its slave :py:class:`GridEnvironment` instances horizontally. Call :meth:`creamas.grid.GridMultiEnvironment.set_slave_params` immediately after initializing :class:`GridMultiEnvironment`! .. note:: The manager agents for the slave environments will not be part of :attr:`~creamas.grid.GridEnvironment.grid` in the slave environments. """ def __init__(self, *args, **kwargs): self._gs = kwargs.pop('grid_size') self._origin = kwargs.pop('origin') super().__init__(*args, **kwargs) self._neighbors = {'N': None, 'E': None, 'S': None, 'W': None}
[docs] async def spawn_slaves(self, *args, **kwargs): await super().spawn_slaves(*args, **kwargs) self._n_slaves = len(kwargs['slave_addrs']) self._end_coord = (self.origin[0] + (self.gs[0] * self._n_slaves) - 1, self.origin[1] + self.gs[1] - 1)
[docs] async def set_slave_params(self): """Set origin and grid size for each slave environment. This method needs to be called before slave environments are populated and agents' and slave environments' neighbors are set. """ self._slave_origins = [] cur_x = self.origin[0] for addr in self.addrs: new_origin = (cur_x, self.origin[1]) await self.set_origin(addr, new_origin) await self.set_gs(addr, self._gs) self._slave_origins.append((new_origin, addr)) new_x = cur_x + self.gs[0] cur_x = new_x
@property def origin(self): """Origin of this multi-environment (the *xy* coordinate of the first agent in the first slave environment). """ return self._origin @property def gs(self): """Grid size for each slave environment. """ return self._gs async def set_gs(self, addr, gs): r_agent = await self.env.connect(addr) return await r_agent.set_gs(gs) async def get_gs(self, addr): r_agent = await self.env.connect(addr) return await r_agent.get_gs() async def set_origin(self, addr, origin): r_agent = await self.env.connect(addr) return await r_agent.set_origin(origin) async def get_origin(self, addr): r_agent = await self.env.connect(addr) return await r_agent.get_origin() @property def neighbors(self): """Map of neighboring multi-environments for this multi-environment. The map's values are *manager addresses* for the neighbors. """ return self._neighbors
[docs] def get_xy_environment(self, xy): """Get manager address for the environment which should have the agent with given *xy* coordinate, or None if no such environment is in this multi-environment. """ x = xy[0] y = xy[1] for origin, addr in self._slave_origins: ox = origin[0] oy = origin[1] if ox <= x < ox + self.gs[0] and oy <= y < oy + self.gs[1]: return addr return None
[docs] async def get_xy_address(self, xy): """Get address of the agent residing in *xy* coordinate, or ``None`` if no such agent is in this multi-environment. """ manager_addr = self.get_xy_environment(xy) if manager_addr is None: return None else: r_agent = await self._env.connect(manager_addr) xy_addr = await r_agent.get_xy_address(xy) return xy_addr
[docs] async def set_slave_neighbors(self): """Set neighbor environments for all the slave environments. Assumes that :attr:`neighbors` are set for this multi-environment. """ for i, elem in enumerate(self._slave_origins): o, addr = elem r_slave = await self.env.connect(addr) nxy = _get_neighbor_xy('N', o) exy = _get_neighbor_xy('E', (o[0] + self.gs[0] - 1, o[1])) sxy = _get_neighbor_xy('S', (o[0], o[1] + self.gs[1] - 1)) wxy = _get_neighbor_xy('W', o) if i == 0 and self.neighbors['W'] is not None: m_addr = self.neighbors['W'] r_manager = await self.env.connect(m_addr) n_addr = await r_manager.get_xy_environment(wxy) await r_slave.set_grid_neighbor('W', n_addr) elif i == self._n_slaves - 1 and self.neighbors['E'] is not None: m_addr = self.neighbors['E'] r_manager = await self.env.connect(m_addr) n_addr = await r_manager.get_xy_environment(exy) await r_slave.set_grid_neighbor('E', n_addr) else: w_addr = self.get_xy_environment(wxy) e_addr = self.get_xy_environment(exy) await r_slave.set_grid_neighbor('W', w_addr) await r_slave.set_grid_neighbor('E', e_addr) if self.neighbors['N'] is not None: m_addr = self.neighbors['N'] r_manager = await self.env.connect(m_addr) n_addr = await r_manager.get_xy_environment(nxy) await r_slave.set_grid_neighbor('N', n_addr) if self.neighbors['S'] is not None: m_addr = self.neighbors['S'] r_manager = await self.env.connect(m_addr) n_addr = await r_manager.get_xy_environment(sxy) await r_slave.set_grid_neighbor('S', n_addr)
[docs] async def set_agent_neighbors(self): """Set neighbors for all the agents in all the slave environments. Assumes that all the slave environments have their neighbors set. """ for addr in self.addrs: r_manager = await self.env.connect(addr) await r_manager.set_agent_neighbors()
[docs] async def set_neighbors(self): """Set neighbors for all slave environments and agents in them. This is a convenience function for calling :meth:`~creamas.grid.GridMultiEnvironment.set_slave_neighbors` and :meth:`~creamas.grid.GridMultiEnvironment.set_agent_neighbors`. """ await self.set_slave_neighbors() await self.set_agent_neighbors()
async def _populate_slave(self, addr, agent_cls, n, *args, **kwargs): r_manager = await self.env.connect(addr, timeout=5) ret = await r_manager.spawn_n(agent_cls, n, *args, **kwargs) return ret
[docs] async def populate(self, agent_cls, *args, **kwargs): """Populate all the slave grid environments with agents. Assumes that no agents have been spawned yet to the slave environment grids. This excludes the slave environment managers as they are not in the grids.) """ n = self.gs[0] * self.gs[1] tasks = [] for addr in self.addrs: task = asyncio.ensure_future(self._populate_slave(addr, agent_cls, n, *args, **kwargs)) tasks.append(task) rets = await asyncio.gather(*tasks) return rets
[docs]class GridMultiEnvManager(MultiEnvManager): """Manager agent for :py:class:`GridMultiEnvironment`. """
[docs] @expose async def set_origin(self, mgr_addr, origin): """Set originating coordinates for :py:class:`GridEnvironment` which manager is in given address. :param str mgr_addr: Address of the manager agent :param origin: New origin of the grid environment, iterable with length 2. """ remote_manager = await self.env.connect(mgr_addr) await remote_manager.set_origin(origin)
[docs] @expose async def set_gs(self, mgr_addr, gs): """Set grid size for :py:class:`GridEnvironment` which manager is in given address. :param str mgr_addr: Address of the manager agent :param gs: New grid size of the grid environment, iterable with length 2. """ remote_manager = await self.env.connect(mgr_addr) await remote_manager.set_gs(gs)
[docs] @expose def set_grid_neighbor(self, card, addr): """Set the neighbor multi-grid for this multi-grid in *card* cardinal direction. The *addr* should point to the *manager* of the neighboring multi-grid. """ self.menv.neighbors[card] = addr
[docs] @expose async def get_xy_address(self, xy): """Get address of the agent in the environment with given coordinates. This is a managing function for :meth:`~creamas.grid.MultiEnvironment.get_xy_address`. """ ret = await self.menv.get_xy_address(xy) return ret
[docs] @expose def get_xy_environment(self, xy): """Get environment (address of the manager of that environment) which has agent with given coordinates. This is a managing function for :meth:`~creamas.grid.MultiEnvironment.get_xy_environment`. """ return self.menv.get_xy_environment(xy)
[docs] @expose async def set_slave_neighbors(self): """Set neighbor environments for all the slave environments. This is a managing function for :meth:`creamas.grid.GridMultiEnvironment.set_slave_neighbors`. """ await self.menv.set_slave_neighbors()
[docs] @expose async def set_agent_neighbors(self): """Set neighbor agents for all the agents in the slave environments. This is a managing function for :meth:`creamas.grid.GridMultiEnvironment.set_agent_neighbors`. """ await self.menv.set_agent_neighbors()
[docs] @expose async def set_neighbors(self): """Set neighbors for all the agents in all the slave environments. This is a managing function for :meth:`creamas.grid.GridMultiEnvironment.set_neighbors`. """ await self.menv.set_neighbors()