Merge branch 'implement_agent_step'

Initial implementation of step() accoriding to the rules of Schweitzer
et alii 1996.
A few neighbor fetching functions are now implemented in agent.py.

The MultiHexScalarFields implementation could be simplified as the ants
only drop their pheromones in advance() and not step(), thus they do not
interfere with each other.
We might need to look at the concentration decay for the scalar fields
again to correctly implement it. To me it is not clear whether we should
decrease the pheromone levels before the ants step() or after the ants
step() and before their advance()
This commit is contained in:
Alexander Bocken 2023-04-28 19:13:37 +02:00
commit a5e03b38ac
Signed by: Alexander
GPG Key ID: 1D237BE83F9B05E8
4 changed files with 84 additions and 53 deletions

View File

@ -9,17 +9,19 @@ License: AGPL 3 (see end of file)
import numpy as np
from mesa.agent import Agent
from mesa.space import Coordinate
from typing import overload
class RandomWalkerAnt(Agent):
def __init__(self, unique_id, model, do_follow_chemical_A=True,
def __init__(self, unique_id, model, look_for_chemical=None,
energy_0=1, chemical_drop_rate_0=1, sensitvity_0=1, alpha=0.5) -> None:
super().__init__(unique_id=unique_id, model=model)
self._next_pos : None | Coordinate = None
self.prev_pos : None | Coordinate = None
self.do_follow_chemical_A : bool = True # False -> follow_chemical_B = True
self.look_for_chemical = look_for_chemical
self.drop_chemical = None
self.energy : float = energy_0
self.sensitvity : float = sensitvity_0
self.chemical_drop_rate : float = chemical_drop_rate_0 #TODO: check whether needs to be separated into A and B
@ -30,39 +32,68 @@ class RandomWalkerAnt(Agent):
# TODO
return prop
def step(self):
# Calculate where next ant location should be and store in _next_pos
# TODO
pass
# follow positive gradient
if self.look_for_chemical is not None:
front_concentration = [self.model.grid.fields[self.look_for_chemical][cell] for cell in self.front_neighbors ]
gradient = front_concentration - np.repeat(self.model.grid.fields[self.look_for_chemical][self.pos], 3)
index = np.argmax(gradient)
if gradient[index] > 0:
self._next_pos = self.front_neighbors[index]
return
def drop_chemicals(self):
# drop chemicals (depending on current state) on concentration field
# TODO
# use self.model.grid.add_to_field(key, value, pos) to not interfere with other ants
pass
# do biased random walk
p = np.random.uniform()
if p < self.alpha:
self._next_pos = self.front_neighbor
else:
# need copy() as we would otherwise remove the tuple from all possible lists (aka python "magic")
other_neighbors = self.neighbors().copy()
other_neighbors.remove(self.front_neighbor)
random_index = np.random.choice(range(len(other_neighbors)))
self._next_pos = other_neighbors[random_index]
def drop_chemicals(self) -> None:
# should only be called in advance() as we do not use hidden fields
if self.drop_chemical is not None:
self.model.grid.fields[self.drop_chemical][self.pos] += self.chemical_drop_rate
def advance(self) -> None:
self.drop_chemicals()
self.prev_pos = self.pos
self.pos = self._next_pos
# TODO: find out how to decorate with property properly
def neighbors(self, pos=None, include_center=False):
if pos is None:
pos = self.pos
return self.model.grid.get_neighborhood(pos, include_center=include_center)
@property
def front_neighbors(self):
if self.prev_pos is not None:
assert(self.pos is not None)
x, y = self.pos
x_prev, y_prev = self.prev_pos
dx, dy = x - x_prev, y - y_prev
front = np.array([
(x, y + dy),
(x + dx, y + dy),
(x + dx, y),
])
return front #TODO: verify (do we need to sperate into even/odd?)
else:
# TODO: return all neighbors or raise Exception?
pass
"""
returns all three neighbors which the ant can see
"""
assert(self.prev_pos is not None)
all_neighbors = self.neighbors()
neighbors_at_the_back = self.neighbors(pos=self.prev_pos, include_center=True)
return list(filter(lambda i: i not in neighbors_at_the_back, all_neighbors))
@property
def front_neighbor(self):
"""
returns neighbor of current pos
which is towards the front of the ant
"""
neighbors_prev_pos = self.neighbors(self.prev_pos)
for candidate in self.front_neighbors:
# neighbor in front direction only shares current pos as neighborhood with prev_pos
candidate_neighbors = self.model.grid.get_neighborhood(candidate)
overlap = [x for x in candidate_neighbors if x in neighbors_prev_pos]
if len(overlap) == 1:
return candidate
"""
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.

20
main.py
View File

@ -27,13 +27,27 @@ def main():
max_steps=max_steps)
# just initial testing of MultiHexGrid
a = model.agent_density()
for loc in model.grid.iter_neighborhood(nest_position):
a[loc] = 3
for agent in model.grid.get_neighbors(pos=nest_position, include_center=True):
if agent.unique_id == 2:
agent.do_follow_chemical_A = False
agent.look_for_chemical = "A"
agent.prev_pos = (9,10)
print(agent.front_neighbors)
a[agent.prev_pos] = 1
for pos in agent.front_neighbors:
a[pos] = 6
agent.step()
print(f"{agent._next_pos=}")
agent.advance()
print(agent.front_neighbor)
a[agent.front_neighbor] = 5
print(agent.pos, agent.unique_id, agent.do_follow_chemical_A)
print(agent.pos, agent.unique_id, agent.look_for_chemical)
neighbors = model.grid.get_neighborhood(nest_position)
print(neighbors)
print(a)
if __name__ == "__main__":

View File

@ -11,11 +11,10 @@ License: AGPL 3 (see end of file)
import numpy as np
from mesa.model import Model
from mesa.space import Coordinate, HexGrid, Iterable
from multihex import MultiHexGrid, MultiHexGridScalarFields
from multihex import MultiHexGridScalarFields
from mesa.time import SimultaneousActivation
from mesa.datacollection import DataCollector
from agent import RandomWalkerAnt
from agent import Pheromone
class ActiveWalkerModel(Model):
# TODO: separate food and source into new agents?
@ -25,11 +24,7 @@ class ActiveWalkerModel(Model):
nest_position : Coordinate,
max_steps:int=1000) -> None:
super().__init__()
fields={"A" : True, # key : also have _next prop (for no interference in step)
"B": True,
"nests": False,
"food" : False,
}
fields=["A", "B", "nests", "food"]
self.schedule = SimultaneousActivation(self)
self.grid = MultiHexGridScalarFields(width=width, height=height, torus=True, fields=fields)
self._unique_id_counter = -1
@ -43,7 +38,7 @@ class ActiveWalkerModel(Model):
}
for agent_id in self.get_unique_ids(num_initial_roamers):
agent = RandomWalkerAnt(unique_id=agent_id, model=self, do_follow_chemical_A=True)
agent = RandomWalkerAnt(unique_id=agent_id, model=self, look_for_chemical="A")
self.schedule.add(agent)
self.grid.place_agent(agent, pos=nest_position)
@ -53,6 +48,12 @@ class ActiveWalkerModel(Model):
)
self.datacollector.collect(self) # keep at end of __init___
def agent_density(self):
a = np.zeros((self.grid.width, self.grid.height))
for i in range(self.grid.width):
for j in range(self.grid.height):
a[i,j] = len(self.grid[(i,j)])
return a
def step(self):

View File

@ -93,32 +93,17 @@ class MultiHexGrid(HexGrid):
class MultiHexGridScalarFields(MultiHexGrid):
def __init__(self, fields: dict[str, bool], width : int, height : int, torus : bool, scalar_initial_value : float=0) -> None:
def __init__(self, fields: list[str], width : int, height : int, torus : bool, scalar_initial_value : float=0) -> None:
super().__init__(width=width, height=height, torus=torus)
self._field_props = fields
self.fields : dict[str, npt.NDArray[np.float_]] = {}
for key, is_step_field in fields.items():
for key in fields:
self.fields[key] = np.ones((width, height)).astype(float) * scalar_initial_value
if is_step_field:
self.fields[f"_next_{key}"] = np.zeros((width, height)).astype(float)
def reset_field(self, key : str) -> None:
self.fields[key] = np.zeros((self.width, self.height))
def add_to_field(self, field_key : str, value : float, pos : Coordinate) -> None:
if self._field_props[field_key]:
self.fields[f"_next_{field_key}"][pos] += value
else:
self.fields[field_key][pos] += value
def step(self) -> None:
for key, is_step_field in self._field_props.items():
if is_step_field:
self.fields[key] += self.fields[f"_next_{key}"]
self.reset_field(f"_next_{key}")
"""
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.