Compare commits

..

13 Commits

7 changed files with 715 additions and 148 deletions

230
agent.py
View File

@ -5,23 +5,24 @@ This model implements the actual agents on the grid (a.k.a. the ants)
License: AGPL 3 (see end of file) License: AGPL 3 (see end of file)
(C) Alexander Bocken, Viviane Fahrni, Grace Kagho (C) Alexander Bocken, Viviane Fahrni, Grace Kagho
"""
"""
TO DISCUSS:
Is the separation of energy and sensitivity useful? -> only if we have the disconnect via resistance
""" """
import numpy as np import numpy as np
import numpy.typing as npt import numpy.typing as npt
from mesa.agent import Agent from mesa.agent import Agent
from mesa.space import Coordinate from mesa.space import Coordinate
class RandomWalkerAnt(Agent): class RandomWalkerAnt(Agent):
def __init__(self, unique_id, model, look_for_pheromone=None, def __init__(self, unique_id, model,
energy_0=1, look_for_pheromone=None,
pheromone_drop_rate_0 : dict[str, float]={"A": 80, "B": 80}, drop_pheromone=None,
sensitivity_0=0.99, sensitivity_max = 10000,
alpha=0.6, drop_pheromone=None,
betas : dict[str, float]={"A": 0.0512, "B": 0.0512},
sensitivity_decay_rate=0.01,
sensitivity_max = 300,
sensitivity_min = 0.001,
sensitivity_steepness = 1
) -> None: ) -> None:
super().__init__(unique_id=unique_id, model=model) super().__init__(unique_id=unique_id, model=model)
@ -29,21 +30,12 @@ class RandomWalkerAnt(Agent):
self._next_pos : None | Coordinate = None self._next_pos : None | Coordinate = None
self._prev_pos : None | Coordinate = None self._prev_pos : None | Coordinate = None
self.look_for_pheromone = look_for_pheromone self.look_for_pheromone : str|None = look_for_pheromone
self.drop_pheromone = drop_pheromone self.drop_pheromone : str|None = drop_pheromone
self.energy = energy_0 #TODO: use self.energy : float = self.model.e_0
self.sensitivity_0 = sensitivity_0 self.sensitivity : float = self.model.s_0
self.sensitivity = self.sensitivity_0 self.pheromone_drop_rate : float = self.model.q_0
self.pheromone_drop_rate = pheromone_drop_rate_0
self.alpha = alpha
self.sensitivity_max = sensitivity_max self.sensitivity_max = sensitivity_max
self.sensitivity_min = sensitivity_min
self.sensitivity_decay_rate = sensitivity_decay_rate
self.sensitivity_steepness = sensitivity_steepness
self.betas = betas
self.threshold : dict[str, float] = {"A": 0, "B": 0}
def sens_adj(self, props, key) -> npt.NDArray[np.float_] | float: def sens_adj(self, props, key) -> npt.NDArray[np.float_] | float:
""" """
@ -65,59 +57,70 @@ class RandomWalkerAnt(Agent):
| |
0|________ 0|________
-----------------------> prop -----------------------> prop
For the nonlinear sensitivity, the idea is to use a logistic function that has
a characteristic sigmoidal shape that starts from a low value, increases rapidly,
and then gradually approaches a saturation level.
f(x) = L / (1 + exp(-k*(x - x0)))
f(x) = return value
L = sens_max
k is a parameter that controls the steepness of the curve. We can start with 1
A higher value of k leads to a steeper curve.
x0 is the midpoint of the curve, where the sensitivity starts to increase significantly.
We can make X0 the threshold value
""" """
# if props iterable create array, otherwise return single value # if props iterable create array, otherwise return single value
try: try:
iter(props) iter(props)
except TypeError: except TypeError:
#TODO: proper nonlinear response, not just clamping # TODO: proper nonlinear response, not just clamping
if props > self.sensitivity_max:
non_linear_sens = True return self.sensitivity_max
if non_linear_sens: if props > self.model.q_tr:
L = self.sensitivity_max return props
k = self.sensitivity_steepness
mid = self.threshold[key]
if props > self.sensitivity_max:
return self.sensitivity_max
#Should we still keep these conditions?
# if props > self.threshold[key]:
# return props
else:
adjusted_sensitivity = L / (1 + np.exp(-k * (props - mid)))
print(f'props: {props}, adjusted_value: {adjusted_sensitivity}')
return adjusted_sensitivity
else: else:
if props > self.sensitivity_max: return 0
return self.sensitivity_max #Should we still keep these conditions
if props > self.threshold[key]:
return props
else:
return 0
arr : list[float] = [] arr : list[float] = []
for prop in props: for prop in props:
arr.append(self.sens_adj(prop, key)) arr.append(self.sens_adj(prop, key))
return np.array(arr) return np.array(arr)
def _get_resistance_weights(self, positions=None):
if positions is None:
positions = self.neighbors()
# bit round-about but self.model.grid.fields['res'][positions]
# gets interpreted as slices, not multiple singular positions
resistance = np.array([ self.model.grid.fields['res'][x,y] for x,y in positions ])
easiness = np.max(self.model.grid.fields['res']) - resistance + np.min(self.model.grid.fields['res']) + 1e-15 # + epsilon to not divide by zero
weights = easiness/ np.sum(easiness)
#inv_weights = resistance/ np.sum(resistance)
#weights = 1 - inv_weights
#weights /= np.sum(weights)
return weights
def _choose_next_pos(self): def _choose_next_pos(self):
def _combine_weights(res_weights, walk_weights):
"""
If we have a resistance -> Infinity we want to have a likelihood -> 0 for this direction
Therefore we should multiply our two probabilities.
For the case of no resistance field this will return the normal walk_weights
res_weights : resistance weights: based on resistance field of neighbours
see _get_resistance_weights for more info
walk weights: In case of biased random walk (no positive pheromone gradient):
forward: alpha,
everywhere else: (1- alpaha)/5)
In case of positive pheromone gradient present in front:
max. positive gradient: self.sensitivity
everyhwere else: (1-self.sensitivity)/5
"""
combined = res_weights * walk_weights
normalized = combined / np.sum(combined)
return normalized
def _pick_from_remaining_five(remaining_five):
"""
"""
weights = self._get_resistance_weights(remaining_five)
random_index = np.random.choice(range(len(remaining_five)), p=weights)
self._next_pos = remaining_five[random_index]
self._prev_pos = self.pos
if self._prev_pos is None: if self._prev_pos is None:
i = np.random.choice(range(6)) res_weights = self._get_resistance_weights()
walk_weights = np.ones(6)
weights = _combine_weights(res_weights, walk_weights)
i = np.random.choice(range(6),p=weights)
assert(self.pos is not self.neighbors()[i]) assert(self.pos is not self.neighbors()[i])
self._next_pos = self.neighbors()[i] self._next_pos = self.neighbors()[i]
self._prev_pos = self.pos self._prev_pos = self.pos
@ -126,80 +129,115 @@ class RandomWalkerAnt(Agent):
if self.searching_food: if self.searching_food:
for neighbor in self.front_neighbors: for neighbor in self.front_neighbors:
if self.model.grid.is_food(neighbor): if self.model.grid.is_food(neighbor):
self.drop_pheromone = "B" self.model.grid.fields['food'][neighbor] -= 1 # eat
#resets
self.pheromone_drop_rate = self.model.q_0
self.sensitivity = self.model.s_0
self.energy = self.model.e_0
#now look for other pheromone
self.look_for_pheromone = "A" self.look_for_pheromone = "A"
self.sensitivity = self.sensitivity_0 self.drop_pheromone = "B"
self._prev_pos = neighbor self._prev_pos = neighbor
self._next_pos = self.pos self._next_pos = self.pos
return
elif self.searching_nest: elif self.searching_nest:
for neighbor in self.front_neighbors: for neighbor in self.front_neighbors:
if self.model.grid.is_nest(neighbor): if self.model.grid.is_nest(neighbor):
self.look_for_pheromone = "A" # Is this a correct interpretation? #resets
self.pheromone_drop_rate = self.model.q_0
self.sensitivity = self.model.s_0
self.energy = self.model.e_0
self.look_for_pheromone = "B"
self.drop_pheromone = "A" self.drop_pheromone = "A"
self.sensitivity = self.sensitivity_0
self._prev_pos = neighbor self._prev_pos = neighbor
self._next_pos = self.pos self._next_pos = self.pos
self.model.successful_ants += 1
# recruit new ants # recruit new ants
for agent_id in self.model.get_unique_ids(self.model.num_new_recruits): print("RECRUITING")
if self.model.schedule.get_agent_count() < self.model.num_max_agents: for agent_id in self.model.get_unique_ids(self.model.N_r):
if self.model.schedule.get_agent_count() < self.model.N_m:
agent = RandomWalkerAnt(unique_id=agent_id, model=self.model, look_for_pheromone="B", drop_pheromone="A") agent = RandomWalkerAnt(unique_id=agent_id, model=self.model, look_for_pheromone="B", drop_pheromone="A")
agent._next_pos = self.pos agent._next_pos = self.pos
self.model.schedule.add(agent) self.model.schedule.add(agent)
self.model.grid.place_agent(agent, pos=neighbor) self.model.grid.place_agent(agent, pos=neighbor)
return
# follow positive gradient # follow positive gradient with likelihood self.sensitivity
if self.look_for_pheromone is not None: if self.look_for_pheromone is not None:
# Calculate gradient
front_concentration = [self.model.grid.fields[self.look_for_pheromone][cell] for cell in self.front_neighbors ] front_concentration = [self.model.grid.fields[self.look_for_pheromone][cell] for cell in self.front_neighbors ]
front_concentration = self.sens_adj(front_concentration, self.look_for_pheromone) front_concentration = self.sens_adj(front_concentration, self.look_for_pheromone)
current_pos_concentration = self.sens_adj(self.model.grid.fields[self.look_for_pheromone][self.pos], self.look_for_pheromone) current_pos_concentration = self.sens_adj(self.model.grid.fields[self.look_for_pheromone][self.pos], self.look_for_pheromone)
gradient = front_concentration - np.repeat(current_pos_concentration, 3).astype(np.float_) gradient = front_concentration - np.repeat(current_pos_concentration, 3).astype(np.float_)
# TODO: if two or more neighbors have same concentration randomize? Should be unlikely with floats though
index = np.argmax(gradient) index = np.argmax(gradient)
if gradient[index] > 0: if gradient[index] > 0:
self._next_pos = self.front_neighbors[index] # follow positive gradient with likelihood self.sensitivity * resistance_weight (re-normalized)
all_neighbors_cells = self.neighbors()
highest_gradient_cell = self.front_neighbors[index]
highest_gradient_index_arr = np.where(all_neighbors_cells == highest_gradient_cell)
assert(len(highest_gradient_index_arr) == 1)
all_neighbors_index = highest_gradient_index_arr[0]
sens_weights = np.ones(6) * (1-self.sensitivity)/5
sens_weights[all_neighbors_index] = self.sensitivity
res_weights = self._get_resistance_weights()
weights = _combine_weights(res_weights, sens_weights)
random_index = np.random.choice(range(6), p=weights)
self._next_pos = all_neighbors_cells[random_index]
self._prev_pos = self.pos self._prev_pos = self.pos
return return
# do biased random walk # do biased random walk
p = np.random.uniform() all_neighbors_cells = self.neighbors()
if p < self.alpha: front_index_arr = np.where(all_neighbors_cells == self.front_neighbor)
self._next_pos = self.front_neighbor assert(len(front_index_arr) == 1 )
self._prev_pos = self.pos front_index = front_index_arr[0]
else:
# need copy() as we would otherwise remove the tuple from all possible lists (aka python "magic")
other_neighbors = self.neighbors().copy()
other_neighbors.remove(self.front_neighbor)
random_index = np.random.choice(range(len(other_neighbors)))
self._next_pos = other_neighbors[random_index]
self._prev_pos = self.pos
res_weights = self._get_resistance_weights()
walk_weights = np.ones(6) * (1-self.model.alpha) / 5
walk_weights[front_index] = self.model.alpha
weights = _combine_weights(res_weights, walk_weights)
random_index = np.random.choice(range(6), p=weights)
self._next_pos = all_neighbors_cells[random_index]
self._prev_pos = self.pos
def step(self): def step(self):
self.sensitivity -= self.sensitivity_decay_rate # Die and get removed if no energy
self._choose_next_pos() if self.energy < self.model.e_min:
self._adjust_pheromone_drop_rate() self.model.schedule.remove(self)
#update list of dead agents for time step
self.model.dying_agents += 1
else:
self._choose_next_pos()
self._adjust_pheromone_drop_rate()
self.sensitivity -= self.model.d_s
self.energy -= self.model.grid.fields['res'][self.pos] * self.model.d_e
#kill agent if sensitivity is low
if self.sensitivity < self.sensitivity_min:
self._kill_agent()
def _adjust_pheromone_drop_rate(self): def _adjust_pheromone_drop_rate(self):
if(self.drop_pheromone is not None): if(self.drop_pheromone is not None):
self.pheromone_drop_rate[self.drop_pheromone] -= self.pheromone_drop_rate[self.drop_pheromone] * self.betas[self.drop_pheromone] self.pheromone_drop_rate -= self.pheromone_drop_rate * self.model.beta
def drop_pheromones(self) -> None: def drop_pheromones(self) -> None:
# should only be called in advance() as we do not use hidden fields # should only be called in advance() as we do not use hidden fields
if self.drop_pheromone is not None: if self.drop_pheromone is not None:
self.model.grid.fields[self.drop_pheromone][self.pos] += self.pheromone_drop_rate[self.drop_pheromone] self.model.grid.fields[self.drop_pheromone][self.pos] += self.pheromone_drop_rate
def _kill_agent(self):
#update dead_agent list
self.model.dead_agents.append(self)
def advance(self) -> None: def advance(self) -> None:
self.drop_pheromones() self.drop_pheromones()

37
hexplot.py Executable file
View File

@ -0,0 +1,37 @@
#!/bin/python
import numpy as np
import matplotlib.pyplot as plt
def plot_hexagon(A, title=None, block=True):
X, Y = np.meshgrid(range(A.shape[0]), range(A.shape[-1]))
X, Y = X*2, Y*2
# Turn this into a hexagonal grid
for i, k in enumerate(X):
if i % 2 == 1:
X[i] += 1
Y[:,i] += 1
fig, ax = plt.subplots()
fig.set_figwidth(10)
fig.set_figheight(10)
fig.set_dpi(600)
im = ax.hexbin(
X.reshape(-1),
Y.reshape(-1),
C=A.reshape(-1),
gridsize=int(A.shape[0]/2)
)
# the rest of the code is adjustable for best output
ax.set_aspect(1)
ax.set(xlim=(-4, X.max()+4,), ylim=(-4, Y.max()+4))
ax.axis(False)
ax.set_xmargin(0)
ax.set_ymargin(0)
#plt.colorbar(im, shrink=0.7)
if(title is not None):
pass
#plt.title(title)
plt.savefig(f"{title}.png")

292
main.py
View File

@ -7,19 +7,12 @@ License: AGPL 3 (see end of file)
(C) Alexander Bocken, Viviane Fahrni, Grace Kagho (C) Alexander Bocken, Viviane Fahrni, Grace Kagho
""" """
from model import ActiveWalkerModel from model import ActiveWalkerModel
from agent import RandomWalkerAnt
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from mesa.space import Coordinate from mesa.space import Coordinate
from mesa.datacollection import DataCollector from mesa.datacollection import DataCollector
from multihex import MultiHexGrid #from multihex import MultiHexGrid
def main():
check_pheromone_exponential_decay()
check_ant_sensitivity_linear_decay()
check_ant_pheromone_exponential_decay()
check_ants_follow_gradient()
def check_pheromone_exponential_decay(): def check_pheromone_exponential_decay():
""" """
@ -107,6 +100,7 @@ def check_ant_pheromone_exponential_decay():
num_initial_roamers = 1 num_initial_roamers = 1
num_max_agents = 100 num_max_agents = 100
nest_position : Coordinate = (width //2, height //2) nest_position : Coordinate = (width //2, height //2)
num_food_sources = 0;
max_steps = 1000 max_steps = 1000
model = ActiveWalkerModel(width=width, height=height, model = ActiveWalkerModel(width=width, height=height,
@ -178,16 +172,288 @@ def check_ants_follow_gradient():
print(20*"#") print(20*"#")
model.step() model.step()
def viviane_bfs_example_run():
# Breadth-first-search algorithm for connectivity
def bfs(graph, start_node, threshold): #graph=grid, start_node=nest, threshold=TBD?
from collections import deque
visited = set()
queue = deque([(start_node, [])])
paths = {}
connected_food_sources = set()
if __name__ == "__main__": while queue:
main() current_node, path = queue.popleft()
#current_node = tuple(current_node)
visited.add(current_node)
if current_node in graph:
for neighbor, m.grid.fields["A"] in graph[current_node].items():
if neighbor not in visited and m.grid.fields["A"] >= threshold:
new_path = path + [neighbor]
queue.append((neighbor, new_path))
# Check if the neighbor is a food source
if neighbor in self.grid_food:
if neighbor not in paths:
paths[neighbor] = new_path
connected_food_sources.add(neighbor)
connectivity = len(connected_food_sources)
return connectivity
# Calculate connectivity through BFS
current_paths = bfs(self.grid, self.grid.fields["nests"], 0.000001)
import numpy as np
N = 121
N_X = int(np.sqrt(N))
N_Y = N // N_X
# fancy way of saying absolutely nothing but 11
xv, yv = np.meshgrid(np.arange(N_X), np.arange(N_Y), sparse=False, indexing='xy')
print(f"{N_X=}")
print(f"{N_Y=}")
print(f"{(xv, yv)=}")
print(f"{xv=}")
def fixed_distance_tests():
"""
position a target food source a known distance away from nest
check for no. successful ants for n runs
"""
from tqdm import tqdm
runs = 10
from model import kwargs_paper_setup1 as kwargs
kwargs["N_f"] = 0
kwargs["gamma"] /= 2 # field decays three times slower
kwargs["beta"] /= 2 # drop rates decays three times slower
kwargs["d_s"] /= 2 # drop rates decays three times slower
kwargs["d_e"] /= 2 # drop rates decays three times slower
successful_walkers = {}
for distance in tqdm(range(5,30), position=0, desc="dis"):
successful_walkers[distance] = []
for _ in tqdm(range(runs), position=1, desc="run", leave=False):
model = ActiveWalkerModel(**kwargs)
nest_location = kwargs["nest_position"]
food_location = (nest_location[0] - distance, nest_location[1])
model.grid.add_food(size=100, pos=food_location)
for _ in tqdm(range(model.max_steps), position=2, desc="step", leave=False):
model.step()
successful_walkers[distance].append(model.datacollector.get_model_vars_dataframe().reset_index()["successful_walkers"][kwargs["max_steps"]])
return successful_walkers
def fixed_distance_object_between():
"""
diameter of object: floor(50% of distance)
"""
from tqdm import tqdm
runs = 10
from model import kwargs_paper_setup1 as kwargs
kwargs["N_f"] = 0
kwargs["gamma"] /= 2 # field decays slower
kwargs["beta"] /= 2 # drop rates decays slower
kwargs["d_e"] /= 2 # live longer, search longer
kwargs["d_s"] /= 2 # live longer, search longer
successful_walkers = {}
for distance in tqdm(range(5,30), position=0, desc="dis"):
successful_walkers[distance] = []
for _ in tqdm(range(runs), position=1, desc="run", leave=False):
model = ActiveWalkerModel(**kwargs)
nest_location = kwargs["nest_position"]
food_location = (nest_location[0] - distance, nest_location[1])
object_location = (nest_location[0] - distance//2, nest_location[1])
place_blocking_object(object_location, radius=distance//4, model=model)
model.grid.add_food(size=100, pos=food_location)
for _ in tqdm(range(model.max_steps), position=2, desc="step", leave=False):
model.step()
successful_walkers[distance].append(model.datacollector.get_model_vars_dataframe().reset_index()["successful_walkers"][kwargs["max_steps"]])
return successful_walkers
def place_blocking_object(center, radius, model):
positions = [center]
next_outside = [center]
# We grow from the center and add all neighbours of the outer edge of our blocking object
# Add all neighbours of next_outside that aren't in positions to the object
# by doing this radius times we should get an object of diameter 2 * radius + 1
# positions: accumulator for all positions inside the object of radius radius
# next_outside: keep track what we added in the last go-around. These will be used in the next step.
for _ in range(radius):
outside = next_outside
next_oustide = []
#otherwise interprets the tuple as something stupid
for i in range(len(outside)):
cell = outside[i]
neighbours = model.grid.get_neighborhood(cell)
for n in neighbours:
if n not in positions:
positions.append(n)
next_outside.append(n)
# some large number in comparison to the rest of the resistance field
# such that the probability of stepping on these grid spots tend towards zero
infinity = 1e20
for pos in positions:
model.grid.fields['res'][pos] = infinity
def run_model():
from tqdm import tqdm
# nests rather far away but also partially clumped.
np.random.seed(6)
from model import kwargs_paper_setup1 as kwargs
kwargs["gamma"] /= 2
kwargs["beta"] /= 2
kwargs["d_e"] /= 5 # live longer, search longer
kwargs["d_s"] /= 5 # live longer, search longer
kwargs["N_0"] *= 2 # more initial roamers/scouts
kwargs["max_steps"] *= 2 # more initial roamers/scouts
model = ActiveWalkerModel(**kwargs)
a = np.zeros_like(model.grid.fields['food'])
a[np.nonzero(model.grid.fields['food'])] = 1
a[np.nonzero(model.grid.fields['nests'])] = -1
for _ in tqdm(range(model.max_steps)):
model.step()
return model
from model import kwargs_paper_setup1 as kwargs
kwargs["gamma"] /= 2
kwargs["beta"] /= 2
kwargs["d_e"] /= 5 # live longer, search longer
kwargs["d_s"] /= 5 # live longer, search longer
kwargs["N_0"] *= 2 # more initial roamers/scouts
kwargs["max_steps"] *= 2 # more initial roamers/scouts
def run_model_objects(step, seed=None, title=None):
from tqdm import tqdm
# nests rather far away but also partially clumped.
np.random.seed(6)
from hexplot import plot_hexagon
model = ActiveWalkerModel(**kwargs)
a = np.zeros_like(model.grid.fields['food'])
a[np.nonzero(model.grid.fields['food'])] = 1
a[np.nonzero(model.grid.fields['nests'])] = -1
for current_step in tqdm(range(model.max_steps)):
if current_step == step:
if seed is not None:
np.random.seed(seed)
for _ in range(10):
coord = np.random.randint(0, 100, size=2)
coord = (coord[0], coord[1])
place_blocking_object(center=coord,radius=5, model=model)
a = model.grid.fields["res"]
if title is not None:
plot_hexagon(a, title=title)
model.step()
return model
#if __name__ == "__main__":
#plot_heatmap()
#res = run_model_no_objects()
for i in range(10):
res = run_model_objects(step=6000, seed=i+100, title=f"objects/blockings_run_{i}")
from plot import plot_alive_ants_vs_time, dead_ants_vs_time, plot_connectivity_vs_time
plot_alive_ants_vs_time(res, title=f"objects/run_{i}")
dead_ants_vs_time(res, title=f"objects/dead_ants_run_{i}")
plot_connectivity_vs_time(res, title=f"objects/conn_run_{i}")
#print("DISTANCE TEST VS SUCCESSFUL ANTS OBJECT INBETWEEN")
#res = fixed_distance_tests()
#res = fixed_distance_object_between()
# print("Test")
#from model import kwargs_paper_setup1 as kwargs
#kwargs["resistance_map_type"] = "perlin"
# print(kwargs)
#model = ActiveWalkerModel(**kwargs)
#model.step()
# a = np.zeros_like(model.grid.fields['food'])
# a[np.nonzero(model.grid.fields['food'])] = 1
# plot_hexagon(a, title="Nest locations")
# plot_hexagon(model.grid.fields['res'], title="Resistance Map")
# from tqdm import tqdm as progress_bar
# for _ in progress_bar(range(model.max_steps)):
# model.step()
# Access the DataCollector
#datacollector = model.datacollector
## Get the data from the DataCollector
#model_data = datacollector.get_model_vars_dataframe()
#print(model_data.columns)
## Plot the number of alive ants over time
#plt.plot(model_data.index, model_data['alive_ants'])
#plt.xlabel('Time')
#plt.ylabel('Number of Alive Ants') #this should probably be "active" ants, since it is not considering those in the nest
#plt.title('Number of Alive Ants Over Time')
#plt.grid(True)
#plt.show()
## Plot the number of sucessful walkers over time
#plt.plot(model_data.index, model_data['sucessful_walkers'])
#plt.xlabel('Time')
#plt.ylabel('Number of Sucessful Walkers')
#plt.title('Number of Sucessful Walkers Over Time')
#plt.grid(True)
#plt.show()
## Calculate the cumulative sum
#model_data['cumulative_sucessful_walkers'] = model_data['sucessful_walkers'].cumsum()
## Plot the cumulative sum of sucessful walkers over time
#plt.plot(model_data.index, model_data['cumulative_sucessful_walkers'])
#plt.xlabel('Time')
#plt.ylabel('Cumulative Sucessful Walkers')
#plt.title('Cumulative Sucessful Walkers Over Time')
#plt.grid(True)
#plt.show()
## Values over 100 are to be interpreted as walkers being sucessfull several times since the total max number of ants is 100
# # Connectivity measure
#def check_food_source_connectivity(food_sources, paths): #food_sources = nodes.is_nest, paths=result from BFS
# connected_food_sources = set()
# for source in food_sources:
# if source in paths:
# connected_food_sources.add(source)
# connectivity = len(connected_food_sources)
# return connectivity
# # Calculate connectivity through BFS
# current_paths = bfs(self.grid, self.grid.fields["nests"], 0.000001)
""" """
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/> You should have received a copy of the GNU Affero General Public License along with this program. If not, see <https://www.gnu.org/licenses/>
""" """

178
model.py
View File

@ -16,46 +16,176 @@ from mesa.time import SimultaneousActivation
from mesa.datacollection import DataCollector from mesa.datacollection import DataCollector
from agent import RandomWalkerAnt from agent import RandomWalkerAnt
kwargs_paper_setup1 = {
"width": 100,
"height": 100,
"N_0": 20,
"N_m": 100,
"N_r": 5,
"alpha": 0.6,
"gamma": 0.001,
"beta": 0.0512,
"d_s": 0.001,
"d_e": 0.001,
"s_0": 0.99,
"e_0": 0.99,
"q_0": 80,
"q_tr": 1,
"e_min": 0,
"nest_position": (49,49),
"N_f": 5,
"food_size" : 55,
"max_steps": 8000,
"resistance_map_type" : None,
}
kwargs_paper_setup2 = {
"width": 100,
"height": 100,
"N_0": 20,
"N_m": 100,
"N_r": 5,
"alpha": 0.6,
"gamma": 0.01,
"beta": 0.0512,
"d_s": 0.001,
"d_e": 0.001,
"s_0": 0.99,
"e_0": 0.99,
"q_0": 80,
"q_tr": 1,
"e_min": 0,
"nest_position": (49,49),
"N_f": 5,
"food_size" : 550,
"max_steps": 8000,
"resistance_map_type" : None,
}
class ActiveWalkerModel(Model): class ActiveWalkerModel(Model):
def __init__(self, width : int, height : int , num_max_agents : int, def __init__(self, width : int, height : int,
num_initial_roamers : int, N_0 : int, # number of initial roamers
N_m : int, # max number of ants
N_r : int, # number of new recruits
alpha : float, #biased random walk
beta : float, # decay rate drop rate
gamma : float, # decay rate pheromone concentration fields
d_s : float, # decay rate sensitvity
d_e : float, # decay rate energy
s_0 : float, # sensitvity reset
e_0 : float, # energy reset
q_0 : float, # initial pheromone level
q_tr : float, # threshold under which ant cannot distinguish concentrations
e_min : float, # energy at which walker dies
nest_position : Coordinate, nest_position : Coordinate,
num_food_sources=5, N_f=5, #num food sources
food_size=10, food_size= 55,
max_steps:int=1000, max_steps:int=1000,
resistance_map_type=None,
) -> None: ) -> None:
super().__init__() super().__init__()
fields=["A", "B", "nests", "food"]
self.N_m : int = N_m # max number of ants
self.N_r : int = N_r # number of new recruits
self.alpha : float = alpha # biased random walk if no gradient
self.gamma : float = gamma # decay rate pheromone concentration fields
self.beta : float = beta # decay rate drop rate
self.d_s : float = d_s # decay rate sensitvity
self.d_e : float = d_e # decay rate energy (get's multiplied with resistance)
self.s_0 : float = s_0 # sensitvity reset
self.e_0 : float = e_0 # energy reset
self.q_0 : float = q_0 # pheromone drop rate reset
self.q_tr : float = q_tr # threshold under which ant cannot distinguish concentrations
self.e_min : float = e_min # energy at which walker dies
self.N_f : int = N_f #num food sources
self.successful_ants = 0 # for viviane's graph
self.connectivity = 0 # for viviane's persistence
self.dying_agents = 0
fields=["A", "B", "nests", "food", "res"]
self.schedule = SimultaneousActivation(self) self.schedule = SimultaneousActivation(self)
self.grid = MultiHexGridScalarFields(width=width, height=height, torus=True, fields=fields) self.grid = MultiHexGridScalarFields(width=width, height=height, torus=True, fields=fields)
if resistance_map_type is None:
self.grid.fields["res"] = np.ones((width, height)).astype(float)
elif resistance_map_type == "perlin":
# perlin generates anisotropic noise which may or may not be a good choice
# pip3 install git+https://github.com/pvigier/perlin-numpy
from perlin_numpy import (
generate_fractal_noise_2d,
generate_perlin_noise_2d,
)
noise = generate_perlin_noise_2d(shape=(width,height), res=((10,10)))
# normalized to mean=1, min=0, and max=2
normalized_noise = (noise - np.min(noise))/(np.max(noise) - np.min(noise)) * 2
self.grid.fields["res"] = normalized_noise
else:
# possible other noise types: simplex or value
raise NotImplemented(f"{resistance_map_type=} is not implemented.")
self._unique_id_counter = -1 self._unique_id_counter = -1
self.max_steps = max_steps self.max_steps = max_steps
self.grid.add_nest(nest_position) self.grid.add_nest(nest_position)
self.num_max_agents = num_max_agents
self.num_new_recruits = 5
self.decay_rates : dict[str, float] = {"A" :0.01,
"B": 0.01,
}
self.dead_agents = [] for agent_id in self.get_unique_ids(N_0):
if self.schedule.get_agent_count() < self.N_m:
for agent_id in self.get_unique_ids(num_initial_roamers):
if self.schedule.get_agent_count() < self.num_max_agents:
agent = RandomWalkerAnt(unique_id=agent_id, model=self, look_for_pheromone="A", drop_pheromone="A") agent = RandomWalkerAnt(unique_id=agent_id, model=self, look_for_pheromone="A", drop_pheromone="A")
self.schedule.add(agent) self.schedule.add(agent)
self.grid.place_agent(agent, pos=nest_position) self.grid.place_agent(agent, pos=nest_position)
for _ in range(num_food_sources): for _ in range(N_f):
self.grid.add_food(food_size) self.grid.add_food(food_size)
self.datacollector = DataCollector( self.datacollector = DataCollector(
model_reporters={}, # model_reporters={"agent_dens": lambda m: m.agent_density()},
model_reporters = {"pheromone_a": lambda m: m.grid.fields["A"],
"pheromone_b": lambda m: m.grid.fields["B"],
"alive_ants": lambda m: m.schedule.get_agent_count(),
"dying_ants": lambda m: m.dying_agents,
"successful_walkers": lambda m: m.successful_ants,
"connectivity": lambda m: m.connectivity,
},
agent_reporters={} agent_reporters={}
) )
self.datacollector.collect(self) # keep at end of __init___ self.datacollector.collect(self) # keep at end of __init___
# Breadth-first-search algorithm for connectivity
def bfs(self):
threshold = 0.5 # half of min sens
connectivity = 0 #initial value of connectivity
connected_food_sources = [] #empty list of connected food sources
visited = [] #empty list of visited (by the algorithm) nodes
nest = np.argwhere(self.grid.fields["nests"] == 1) #get nest location
nest = (nest[0][0], nest[0][1])
start_node = nest #rename
neighbours_to_check = [start_node] #start node gets checked first
neighbours_to_check = neighbours_to_check + self.grid.get_neighborhood(start_node) #start node neighbours get added to the to check list
while neighbours_to_check: #as long as there is something on the to check list
current_node = neighbours_to_check[0] #the first list entry is taken
del neighbours_to_check[0] #and deleted on the to check list
if current_node not in visited: #if it has not previously been checked
if np.max([self.grid.fields["B"][current_node], self.grid.fields["A"][current_node]]) >= threshold: #and its A value is above our threshold
new_neighbors = self.grid.get_neighborhood(current_node) #then we get its neighbours
for new_neighbor in new_neighbors:
if new_neighbor not in visited and new_neighbor not in neighbours_to_check:
neighbours_to_check.append(new_neighbor) #then they are also added to our to check list
visited.append(current_node) #and the current node has now been checked
if self.grid.fields["food"][current_node] > 0: #in case the node we check is food
connectivity += 1 #then we have found a connected path to a food source
connected_food_sources = connected_food_sources + list([current_node]) #and it is added to the list of connected food sources
# why not normalize to 0-1 ?
return connectivity #we want the connectivity (0-5)
def agent_density(self): def agent_density(self):
a = np.zeros((self.grid.width, self.grid.height)) a = np.zeros((self.grid.width, self.grid.height))
for i in range(self.grid.width): for i in range(self.grid.width):
@ -65,26 +195,23 @@ class ActiveWalkerModel(Model):
def step(self): def step(self):
self.dying_agents = 0
self.schedule.step() # step() and advance() all agents self.schedule.step() # step() and advance() all agents
if self.schedule.steps % 100 == 0:
pass
# apply decay rate on pheromone levels # apply decay rate on pheromone levels
for key in ("A", "B"): for key in ("A", "B"):
field = self.grid.fields[key] field = self.grid.fields[key]
self.grid.fields[key] = field - self.decay_rates[key]*field self.grid.fields[key] = field - self.gamma*field
self.datacollector.collect(self) self.datacollector.collect(self)
if self.schedule.steps >= self.max_steps: if self.schedule.steps >= self.max_steps:
self.running = False self.running = False
#remove dead agents
for agent in self.dead_agents:
self.schedule.remove(agent)
self.grid.remove_agent(agent)
self.dead_agents.remove(agent)
self.dead_agents = []
#ToDo what happens when all agents die
def get_unique_id(self) -> int: def get_unique_id(self) -> int:
self._unique_id_counter += 1 self._unique_id_counter += 1
return self._unique_id_counter return self._unique_id_counter
@ -93,6 +220,7 @@ class ActiveWalkerModel(Model):
for _ in range(num_ids): for _ in range(num_ids):
yield self.get_unique_id() yield self.get_unique_id()
""" """
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.

View File

@ -107,7 +107,8 @@ class MultiHexGridScalarFields(MultiHexGrid):
def is_food(self, pos): def is_food(self, pos):
assert('food' in self.fields.keys()) assert('food' in self.fields.keys())
return bool(self.fields['food'][pos]) # account for potential float imprecision and use epsilon = 1e-3
return self.fields['food'][pos] > 1e-3
def add_food(self, size : int , pos=None): def add_food(self, size : int , pos=None):
""" """
@ -127,7 +128,7 @@ class MultiHexGridScalarFields(MultiHexGrid):
while(self.is_nest(pos) or self.is_food(pos)): while(self.is_nest(pos) or self.is_food(pos)):
pos = select_random_place() pos = select_random_place()
self.fields['food'][pos] = size self.fields['food'][pos] = int(size)
def is_nest(self, pos : Coordinate) -> bool: def is_nest(self, pos : Coordinate) -> bool:
assert('nests' in self.fields.keys()) assert('nests' in self.fields.keys())

71
plot.py Normal file
View File

@ -0,0 +1,71 @@
#!/bin/python
import matplotlib.pyplot as plt
import numpy as np
from hexplot import plot_hexagon
def plot_alive_ants_vs_time(model, title=None):
y = model.datacollector.get_model_vars_dataframe()["alive_ants"]
plt.figure(figsize=(10,10), dpi=600)
plt.plot(y)
plt.xlabel("time step")
plt.ylabel("alive agents")
if title is None:
plt.savefig("alive_agents_over_time.eps")
else:
plt.savefig(f"{title}.png")
def plot_connectivity_vs_time(model, title=None):
y = model.datacollector.get_model_vars_dataframe()["connectivity"]
plt.figure(figsize=(10,10), dpi=600)
plt.plot(y)
plt.xlabel("time step")
plt.ylabel("No. of food sources connected to the nest")
if title is None:
plt.savefig("connectivity_over_time.eps")
else:
plt.savefig(f"{title}.png")
def dead_ants_vs_time(model, title=None):
y = np.cumsum(model.datacollector.get_model_vars_dataframe()["dying_ants"])
plt.figure(figsize=(10,10), dpi=600)
plt.plot(y)
plt.xlabel("time step")
plt.ylabel("dead agents")
if title is None:
plt.savefig("dead_agents_over_time.eps")
else:
plt.savefig(f"{title}.png")
def cum_successful_ants_vs_time(model, title=None):
y = model.datacollector.get_model_vars_dataframe()["successful_walkers"]
plt.figure(figsize=(10,10), dpi=600)
plt.plot(y)
plt.xlabel("time step")
plt.ylabel("cummulative successful agents")
if title is None:
plt.savefig("cumsum_successful_agents_over_time.eps")
else:
plt.savefig(f"{title}.png")
def plot_heatmap(model, low=10, high=200):
for time in np.arange(0, model.max_steps + 1, 1000):
pheromone_concentration = model.datacollector.get_model_vars_dataframe()["pheromone_a"][time]
a = pheromone_concentration
#plot_hexagon(a)
pheromone_concentration = model.datacollector.get_model_vars_dataframe()["pheromone_b"][time]
b = pheromone_concentration
#plot_hexagon(b)
c = np.max([a,b], axis=0)
c = a + b
c = np.clip(c, 1, 1000000000)
c = np.log(c)
c = c/np.max(c)
food_locations = np.nonzero(model.grid.fields['food'])
x_food = [ food[0] for food in food_locations ]
y_food = [ food[1] for food in food_locations ]
plot_hexagon(c, title=f"cummulative pheromone density at timestep {time}")

View File

@ -22,10 +22,12 @@ def setup(params=None):
# Set the model parameters # Set the model parameters
if params is None: if params is None:
params = { params = {
"max_steps": 3000,
"width": 50, "height": 50, "width": 50, "height": 50,
"num_max_agents" : 100, "N_m" : 100,
"nest_position" : (25,25), "nest_position" : (25,25),
"num_initial_roamers" : 5, "N_0" : 5,
"resistance_map_type": "perlin",
} }
@ -85,12 +87,9 @@ def setup(params=None):
"Color": col, "Color": col,
} }
def get_max_grid_val(model, key): def portray_resistance_map(model, pos, norm=1):
return np.max(model.grid.fields[key]) col = get_color(level=model.grid.fields['res'][pos], normalization=norm)
col = f"rgb({col}, {col}, {col})"
def portray_pheromone_density(model, pos, norm):
col_a = get_color(level=model.grid.fields["A"][pos], normalization=norm)
col_b = get_color(level=model.grid.fields["B"][pos], normalization=norm)
return { return {
"Shape": "hex", "Shape": "hex",
"r": 1, "r": 1,
@ -98,7 +97,26 @@ def setup(params=None):
"Layer": 0, "Layer": 0,
"x": pos[0], "x": pos[0],
"y": pos[1], "y": pos[1],
"Color": f"rgb({col_a}, {col_b}, 255)" "Color": col,
}
def get_max_grid_val(model, key):
return np.max(model.grid.fields[key])
def portray_pheromone_density(model, pos, norm):
col_a = get_color(level=model.grid.fields["A"][pos], normalization=norm)
col_b = get_color(level=model.grid.fields["B"][pos], normalization=norm)
res_min, res_max = np.min(model.grid.fields['res']), np.max(model.grid.fields['res'])
ease = 1 - model.grid.fields['res'][pos]
col_ease = get_color(level=ease, normalization=np.max(model.grid.fields['res']))
return {
"Shape": "hex",
"r": 1,
"Filled": "true",
"Layer": 0,
"x": pos[0],
"y": pos[1],
"Color": f"rgb({col_a}, {col_b}, {col_ease})"
} }
@ -109,6 +127,9 @@ def setup(params=None):
grid_ants = CanvasHexGridMultiAgents(portray_ant_density, grid_ants = CanvasHexGridMultiAgents(portray_ant_density,
width, height, width*pixel_ratio, height*pixel_ratio, width, height, width*pixel_ratio, height*pixel_ratio,
norm_method=lambda m: 5) norm_method=lambda m: 5)
grid_resistance_map = CanvasHexGridMultiAgents(portray_resistance_map,
width, height, width*pixel_ratio, height*pixel_ratio,
norm_method=lambda m: 1)
def norm_ants(model): def norm_ants(model):
return 5 return 5
@ -127,14 +148,19 @@ def setup(params=None):
[lambda m: "<h3>Ant density</h3><h5>Nest: Red, Food: Green</h5>", [lambda m: "<h3>Ant density</h3><h5>Nest: Red, Food: Green</h5>",
grid_ants, grid_ants,
lambda m: f"<h5>Normalization Value: {norm_ants(m)}</h5>", lambda m: f"<h5>Normalization Value: {norm_ants(m)}</h5>",
lambda m: "<h3>Pheromone Density</h3><h5>Pheromone A: Cyan, Pheromone B: Magenta</h5>", lambda m: "<h3>Pheromone Density</h3><h5>Pheromone A: Cyan, Pheromone B: Magenta, Resistance Map: Yellow</h5>",
grid_pheromones, grid_pheromones,
lambda m: f"<h5>Normalization Value: {norm_pheromones(m)}</h5>" lambda m: f"<h5>Normalization Value: {norm_pheromones(m)}</h5>",
], ],
"Active Random Walker Ants", params) "Active Random Walker Ants", params)
if __name__ == "__main__": if __name__ == "__main__":
server = setup() from model import kwargs_paper_setup1
kwargs_paper1_perlin = kwargs_paper_setup1
kwargs_paper1_perlin["height"] = 50
kwargs_paper1_perlin["width"] = 50
kwargs_paper1_perlin["resistance_map_type"] = "perlin"
server = setup(params=kwargs_paper1_perlin)
server.launch() server.launch()
""" """