License: AGPL 3 (see end of file)
(C) Alexander Bocken, Viviane Fahrni, Grace Kagho
Is the separation of energy and sensitivity useful? -> only if we have the disconnect via resistance
import numpy as np
import numpy.typing as npt
from mesa.agent import Agent
from import Coordinate
class RandomWalkerAnt(Agent):
def __init__(self, unique_id, model,
sensitivity_max = 10000,
def __init__(self, unique_id, model, look_for_pheromone=None,
pheromone_drop_rate_0 : dict[str, float]={"A": 80, "B": 80},
alpha=0.6, drop_pheromone=None,
betas : dict[str, float]={"A": 0.0512, "B": 0.0512},
sensitivity_max = 300,
sensitivity_min = 0.001,
sensitivity_steepness = 1
) -> None:
super().__init__(unique_id=unique_id, model=model)
self._next_pos : None | Coordinate = None
self._prev_pos : None | Coordinate = None
self.look_for_pheromone : str|None = look_for_pheromone
self.drop_pheromone : str|None = drop_pheromone
|||| : float = self.model.e_0
self.sensitivity : float = self.model.s_0
self.pheromone_drop_rate : float = self.model.q_0
self.look_for_pheromone = look_for_pheromone
self.drop_pheromone = drop_pheromone
|||| = energy_0 #TODO: use
self.sensitivity_0 = sensitivity_0
self.sensitivity = self.sensitivity_0
self.pheromone_drop_rate = pheromone_drop_rate_0
self.alpha = alpha
self.sensitivity_max = sensitivity_max
self.sensitivity_min = sensitivity_min
self.sensitivity_decay_rate = sensitivity_decay_rate
self.sensitivity_steepness = sensitivity_steepness
self.betas = betas
self.threshold : dict[str, float] = {"A": 0, "B": 0}
def sens_adj(self, props, key) -> npt.NDArray[np.float_] | float:
-----------------------> prop
For the nonlinear sensitivity, the idea is to use a logistic function that has
a characteristic sigmoidal shape that starts from a low value, increases rapidly,
and then gradually approaches a saturation level.
f(x) = L / (1 + exp(-k*(x - x0)))
f(x) = return value
L = sens_max
k is a parameter that controls the steepness of the curve. We can start with 1
A higher value of k leads to a steeper curve.
x0 is the midpoint of the curve, where the sensitivity starts to increase significantly.
We can make X0 the threshold value
# if props iterable create array, otherwise return single value
except TypeError:
# TODO: proper nonlinear response, not just clamping
if props > self.sensitivity_max:
return self.sensitivity_max
if props > self.model.q_tr:
return props
#TODO: proper nonlinear response, not just clamping
non_linear_sens = True
if non_linear_sens:
L = self.sensitivity_max
k = self.sensitivity_steepness
mid = self.threshold[key]
if props > self.sensitivity_max:
return self.sensitivity_max
#Should we still keep these conditions?
# if props > self.threshold[key]:
# return props
adjusted_sensitivity = L / (1 + np.exp(-k * (props - mid)))
print(f'props: {props}, adjusted_value: {adjusted_sensitivity}')
return adjusted_sensitivity
return 0
if props > self.sensitivity_max:
return self.sensitivity_max #Should we still keep these conditions
if props > self.threshold[key]:
return props
return 0
arr : list[float] = []
for prop in props:
arr.append(self.sens_adj(prop, key))
return np.array(arr)
def _get_resistance_weights(self, positions=None):
if positions is None:
positions = self.neighbors()
# bit round-about but self.model.grid.fields['res'][positions]
# gets interpreted as slices, not multiple singular positions
resistance = np.array([ self.model.grid.fields['res'][x,y] for x,y in positions ])
easiness = np.max(self.model.grid.fields['res']) - resistance + np.min(self.model.grid.fields['res']) + 1e-15 # + epsilon to not divide by zero
weights = easiness/ np.sum(easiness)
#inv_weights = resistance/ np.sum(resistance)
#weights = 1 - inv_weights
#weights /= np.sum(weights)
return weights
def _choose_next_pos(self):
def _combine_weights(res_weights, walk_weights):
If we have a resistance -> Infinity we want to have a likelihood -> 0 for this direction
Therefore we should multiply our two probabilities.
For the case of no resistance field this will return the normal walk_weights
res_weights : resistance weights: based on resistance field of neighbours
see _get_resistance_weights for more info
walk weights: In case of biased random walk (no positive pheromone gradient):
forward: alpha,
everywhere else: (1- alpaha)/5)
In case of positive pheromone gradient present in front:
max. positive gradient: self.sensitivity
everyhwere else: (1-self.sensitivity)/5
combined = res_weights * walk_weights
normalized = combined / np.sum(combined)
return normalized
def _pick_from_remaining_five(remaining_five):
weights = self._get_resistance_weights(remaining_five)
random_index = np.random.choice(range(len(remaining_five)), p=weights)
self._next_pos = remaining_five[random_index]
self._prev_pos = self.pos
if self._prev_pos is None:
res_weights = self._get_resistance_weights()
walk_weights = np.ones(6)
weights = _combine_weights(res_weights, walk_weights)
i = np.random.choice(range(6),p=weights)
i = np.random.choice(range(6))
assert(self.pos is not self.neighbors()[i])
self._next_pos = self.neighbors()[i]
self._prev_pos = self.pos
if self.searching_food:
for neighbor in self.front_neighbors:
if self.model.grid.is_food(neighbor):
self.model.grid.fields['food'][neighbor] -= 1 # eat
self.pheromone_drop_rate = self.model.q_0
self.sensitivity = self.model.s_0
|||| = self.model.e_0
#now look for other pheromone
self.look_for_pheromone = "A"
self.drop_pheromone = "B"
self.look_for_pheromone = "A"
self.sensitivity = self.sensitivity_0
self._prev_pos = neighbor
self._next_pos = self.pos
elif self.searching_nest:
for neighbor in self.front_neighbors:
if self.model.grid.is_nest(neighbor):
self.pheromone_drop_rate = self.model.q_0
self.sensitivity = self.model.s_0
|||| = self.model.e_0
self.look_for_pheromone = "B"
self.look_for_pheromone = "A" # Is this a correct interpretation?
self.drop_pheromone = "A"
self.sensitivity = self.sensitivity_0
self._prev_pos = neighbor
self._next_pos = self.pos
self.model.successful_ants += 1
# recruit new ants
for agent_id in self.model.get_unique_ids(self.model.N_r):
if self.model.schedule.get_agent_count() < self.model.N_m:
for agent_id in self.model.get_unique_ids(self.model.num_new_recruits):
if self.model.schedule.get_agent_count() < self.model.num_max_agents:
agent = RandomWalkerAnt(unique_id=agent_id, model=self.model, look_for_pheromone="B", drop_pheromone="A")
agent._next_pos = self.pos
self.model.grid.place_agent(agent, pos=neighbor)
# follow positive gradient with likelihood self.sensitivity
# follow positive gradient
if self.look_for_pheromone is not None:
# Calculate gradient
front_concentration = [self.model.grid.fields[self.look_for_pheromone][cell] for cell in self.front_neighbors ]
front_concentration = self.sens_adj(front_concentration, self.look_for_pheromone)
current_pos_concentration = self.sens_adj(self.model.grid.fields[self.look_for_pheromone][self.pos], self.look_for_pheromone)
gradient = front_concentration - np.repeat(current_pos_concentration, 3).astype(np.float_)
# TODO: if two or more neighbors have same concentration randomize? Should be unlikely with floats though
index = np.argmax(gradient)
if gradient[index] > 0:
# follow positive gradient with likelihood self.sensitivity * resistance_weight (re-normalized)
all_neighbors_cells = self.neighbors()
highest_gradient_cell = self.front_neighbors[index]
highest_gradient_index_arr = np.where(all_neighbors_cells == highest_gradient_cell)
assert(len(highest_gradient_index_arr) == 1)
all_neighbors_index = highest_gradient_index_arr[0]
sens_weights = np.ones(6) * (1-self.sensitivity)/5
sens_weights[all_neighbors_index] = self.sensitivity
res_weights = self._get_resistance_weights()
weights = _combine_weights(res_weights, sens_weights)
random_index = np.random.choice(range(6), p=weights)
self._next_pos = all_neighbors_cells[random_index]
self._next_pos = self.front_neighbors[index]
self._prev_pos = self.pos
# do biased random walk
all_neighbors_cells = self.neighbors()
front_index_arr = np.where(all_neighbors_cells == self.front_neighbor)
assert(len(front_index_arr) == 1 )
front_index = front_index_arr[0]
p = np.random.uniform()
if p < self.alpha:
self._next_pos = self.front_neighbor
self._prev_pos = self.pos
# need copy() as we would otherwise remove the tuple from all possible lists (aka python "magic")
other_neighbors = self.neighbors().copy()
random_index = np.random.choice(range(len(other_neighbors)))
self._next_pos = other_neighbors[random_index]
self._prev_pos = self.pos
res_weights = self._get_resistance_weights()
walk_weights = np.ones(6) * (1-self.model.alpha) / 5
walk_weights[front_index] = self.model.alpha
weights = _combine_weights(res_weights, walk_weights)
random_index = np.random.choice(range(6), p=weights)
self._next_pos = all_neighbors_cells[random_index]
self._prev_pos = self.pos
def step(self):
# Die and get removed if no energy
if < self.model.e_min:
#update list of dead agents for time step
self.model.dying_agents += 1
self.sensitivity -= self.model.d_s
|||| -= self.model.grid.fields['res'][self.pos] * self.model.d_e
self.sensitivity -= self.sensitivity_decay_rate
#kill agent if sensitivity is low
if self.sensitivity < self.sensitivity_min:
def _adjust_pheromone_drop_rate(self):
if(self.drop_pheromone is not None):
self.pheromone_drop_rate -= self.pheromone_drop_rate * self.model.beta
self.pheromone_drop_rate[self.drop_pheromone] -= self.pheromone_drop_rate[self.drop_pheromone] * self.betas[self.drop_pheromone]
def drop_pheromones(self) -> None:
# should only be called in advance() as we do not use hidden fields
if self.drop_pheromone is not None:
self.model.grid.fields[self.drop_pheromone][self.pos] += self.pheromone_drop_rate
self.model.grid.fields[self.drop_pheromone][self.pos] += self.pheromone_drop_rate[self.drop_pheromone]
def _kill_agent(self):
#update dead_agent list
def advance(self) -> None:
import numpy as np
import matplotlib.pyplot as plt
def plot_hexagon(A, title=None, block=True):
X, Y = np.meshgrid(range(A.shape[0]), range(A.shape[-1]))
X, Y = X*2, Y*2
# Turn this into a hexagonal grid
for i, k in enumerate(X):
if i % 2 == 1:
X[i] += 1
Y[:,i] += 1
fig, ax = plt.subplots()
im = ax.hexbin(
# the rest of the code is adjustable for best output
ax.set(xlim=(-4, X.max()+4,), ylim=(-4, Y.max()+4))
#plt.colorbar(im, shrink=0.7)
if(title is not None):
(C) Alexander Bocken, Viviane Fahrni, Grace Kagho
from model import ActiveWalkerModel
from agent import RandomWalkerAnt
import numpy as np
import matplotlib.pyplot as plt
from import Coordinate
from mesa.datacollection import DataCollector
#from multihex import MultiHexGrid
from multihex import MultiHexGrid
def main():
def check_pheromone_exponential_decay():
@ -100,7 +107,6 @@ def check_ant_pheromone_exponential_decay():
num_initial_roamers = 1
num_max_agents = 100
nest_position : Coordinate = (width //2, height //2)
num_food_sources = 0;
max_steps = 1000
model = ActiveWalkerModel(width=width, height=height,
@ -172,288 +178,16 @@ def check_ants_follow_gradient():
def viviane_bfs_example_run():
# Breadth-first-search algorithm for connectivity
def bfs(graph, start_node, threshold): #graph=grid, start_node=nest, threshold=TBD?
from collections import deque
visited = set()
queue = deque([(start_node, [])])
paths = {}
connected_food_sources = set()
while queue:
current_node, path = queue.popleft()
#current_node = tuple(current_node)
if current_node in graph:
for neighbor, m.grid.fields["A"] in graph[current_node].items():
if neighbor not in visited and m.grid.fields["A"] >= threshold:
new_path = path + [neighbor]
queue.append((neighbor, new_path))
# Check if the neighbor is a food source
if neighbor in self.grid_food:
if neighbor not in paths:
paths[neighbor] = new_path
connectivity = len(connected_food_sources)
return connectivity
# Calculate connectivity through BFS
current_paths = bfs(self.grid, self.grid.fields["nests"], 0.000001)
import numpy as np
N = 121
N_X = int(np.sqrt(N))
N_Y = N // N_X
# fancy way of saying absolutely nothing but 11
xv, yv = np.meshgrid(np.arange(N_X), np.arange(N_Y), sparse=False, indexing='xy')
print(f"{(xv, yv)=}")
if __name__ == "__main__":
def fixed_distance_tests():
position a target food source a known distance away from nest
check for no. successful ants for n runs
from tqdm import tqdm
runs = 10
from model import kwargs_paper_setup1 as kwargs
kwargs["N_f"] = 0
kwargs["gamma"] /= 2 # field decays three times slower
kwargs["beta"] /= 2 # drop rates decays three times slower
kwargs["d_s"] /= 2 # drop rates decays three times slower
kwargs["d_e"] /= 2 # drop rates decays three times slower
successful_walkers = {}
for distance in tqdm(range(5,30), position=0, desc="dis"):
successful_walkers[distance] = []
for _ in tqdm(range(runs), position=1, desc="run", leave=False):
model = ActiveWalkerModel(**kwargs)
nest_location = kwargs["nest_position"]
food_location = (nest_location[0] - distance, nest_location[1])
model.grid.add_food(size=100, pos=food_location)
for _ in tqdm(range(model.max_steps), position=2, desc="step", leave=False):
return successful_walkers
def fixed_distance_object_between():
diameter of object: floor(50% of distance)
from tqdm import tqdm
runs = 10
from model import kwargs_paper_setup1 as kwargs
kwargs["N_f"] = 0
kwargs["gamma"] /= 2 # field decays slower
kwargs["beta"] /= 2 # drop rates decays slower
kwargs["d_e"] /= 2 # live longer, search longer
kwargs["d_s"] /= 2 # live longer, search longer
successful_walkers = {}
for distance in tqdm(range(5,30), position=0, desc="dis"):
successful_walkers[distance] = []
for _ in tqdm(range(runs), position=1, desc="run", leave=False):
model = ActiveWalkerModel(**kwargs)
nest_location = kwargs["nest_position"]
food_location = (nest_location[0] - distance, nest_location[1])
object_location = (nest_location[0] - distance//2, nest_location[1])
place_blocking_object(object_location, radius=distance//4, model=model)
model.grid.add_food(size=100, pos=food_location)
for _ in tqdm(range(model.max_steps), position=2, desc="step", leave=False):
return successful_walkers
def place_blocking_object(center, radius, model):
positions = [center]
next_outside = [center]
# We grow from the center and add all neighbours of the outer edge of our blocking object
# Add all neighbours of next_outside that aren't in positions to the object
# by doing this radius times we should get an object of diameter 2 * radius + 1
# positions: accumulator for all positions inside the object of radius radius
# next_outside: keep track what we added in the last go-around. These will be used in the next step.
for _ in range(radius):
outside = next_outside
next_oustide = []
#otherwise interprets the tuple as something stupid
for i in range(len(outside)):
cell = outside[i]
neighbours = model.grid.get_neighborhood(cell)
for n in neighbours:
if n not in positions:
# some large number in comparison to the rest of the resistance field
# such that the probability of stepping on these grid spots tend towards zero
infinity = 1e20
for pos in positions:
model.grid.fields['res'][pos] = infinity
def run_model():
from tqdm import tqdm
# nests rather far away but also partially clumped.
from model import kwargs_paper_setup1 as kwargs
kwargs["gamma"] /= 2
kwargs["beta"] /= 2
kwargs["d_e"] /= 5 # live longer, search longer
kwargs["d_s"] /= 5 # live longer, search longer
kwargs["N_0"] *= 2 # more initial roamers/scouts
kwargs["max_steps"] *= 2 # more initial roamers/scouts
model = ActiveWalkerModel(**kwargs)
a = np.zeros_like(model.grid.fields['food'])
a[np.nonzero(model.grid.fields['food'])] = 1
a[np.nonzero(model.grid.fields['nests'])] = -1
for _ in tqdm(range(model.max_steps)):
return model
from model import kwargs_paper_setup1 as kwargs
kwargs["gamma"] /= 2
kwargs["beta"] /= 2
kwargs["d_e"] /= 5 # live longer, search longer
kwargs["d_s"] /= 5 # live longer, search longer
kwargs["N_0"] *= 2 # more initial roamers/scouts
kwargs["max_steps"] *= 2 # more initial roamers/scouts
def run_model_objects(step, seed=None, title=None):
from tqdm import tqdm
# nests rather far away but also partially clumped.
from hexplot import plot_hexagon
model = ActiveWalkerModel(**kwargs)
a = np.zeros_like(model.grid.fields['food'])
a[np.nonzero(model.grid.fields['food'])] = 1
a[np.nonzero(model.grid.fields['nests'])] = -1
for current_step in tqdm(range(model.max_steps)):
if current_step == step:
if seed is not None:
for _ in range(10):
coord = np.random.randint(0, 100, size=2)
coord = (coord[0], coord[1])
place_blocking_object(center=coord,radius=5, model=model)
a = model.grid.fields["res"]
if title is not None:
plot_hexagon(a, title=title)
return model
#if __name__ == "__main__":
#res = run_model_no_objects()
for i in range(10):
res = run_model_objects(step=6000, seed=i+100, title=f"objects/blockings_run_{i}")
from plot import plot_alive_ants_vs_time, dead_ants_vs_time, plot_connectivity_vs_time
plot_alive_ants_vs_time(res, title=f"objects/run_{i}")
dead_ants_vs_time(res, title=f"objects/dead_ants_run_{i}")
plot_connectivity_vs_time(res, title=f"objects/conn_run_{i}")
#res = fixed_distance_tests()
#res = fixed_distance_object_between()
# print("Test")
#from model import kwargs_paper_setup1 as kwargs
#kwargs["resistance_map_type"] = "perlin"
# print(kwargs)
#model = ActiveWalkerModel(**kwargs)
# a = np.zeros_like(model.grid.fields['food'])
# a[np.nonzero(model.grid.fields['food'])] = 1
# plot_hexagon(a, title="Nest locations")
# plot_hexagon(model.grid.fields['res'], title="Resistance Map")
# from tqdm import tqdm as progress_bar
# for _ in progress_bar(range(model.max_steps)):
# model.step()
# Access the DataCollector
#datacollector = model.datacollector
## Get the data from the DataCollector
#model_data = datacollector.get_model_vars_dataframe()
## Plot the number of alive ants over time
#plt.plot(model_data.index, model_data['alive_ants'])
#plt.ylabel('Number of Alive Ants') #this should probably be "active" ants, since it is not considering those in the nest
#plt.title('Number of Alive Ants Over Time')
## Plot the number of sucessful walkers over time
#plt.plot(model_data.index, model_data['sucessful_walkers'])
#plt.ylabel('Number of Sucessful Walkers')
#plt.title('Number of Sucessful Walkers Over Time')
## Calculate the cumulative sum
#model_data['cumulative_sucessful_walkers'] = model_data['sucessful_walkers'].cumsum()
## Plot the cumulative sum of sucessful walkers over time
#plt.plot(model_data.index, model_data['cumulative_sucessful_walkers'])
#plt.ylabel('Cumulative Sucessful Walkers')
#plt.title('Cumulative Sucessful Walkers Over Time')
## Values over 100 are to be interpreted as walkers being sucessfull several times since the total max number of ants is 100
# # Connectivity measure
#def check_food_source_connectivity(food_sources, paths): #food_sources = nodes.is_nest, paths=result from BFS
# connected_food_sources = set()
# for source in food_sources:
# if source in paths:
# connected_food_sources.add(source)
# connectivity = len(connected_food_sources)
# return connectivity
# # Calculate connectivity through BFS
# current_paths = bfs(self.grid, self.grid.fields["nests"], 0.000001)
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
from mesa.datacollection import DataCollector
from agent import RandomWalkerAnt
kwargs_paper_setup1 = {
"width": 100,
"height": 100,
"N_0": 20,
"N_m": 100,
"N_r": 5,
"alpha": 0.6,
"gamma": 0.001,
"beta": 0.0512,
"d_s": 0.001,
"d_e": 0.001,
"s_0": 0.99,
"e_0": 0.99,
"q_0": 80,
"q_tr": 1,
"e_min": 0,
"nest_position": (49,49),
"N_f": 5,
"food_size" : 55,
"max_steps": 8000,
"resistance_map_type" : None,
kwargs_paper_setup2 = {
"width": 100,
"height": 100,
"N_0": 20,
"N_m": 100,
"N_r": 5,
"alpha": 0.6,
"gamma": 0.01,
"beta": 0.0512,
"d_s": 0.001,
"d_e": 0.001,
"s_0": 0.99,
"e_0": 0.99,
"q_0": 80,
"q_tr": 1,
"e_min": 0,
"nest_position": (49,49),
"N_f": 5,
"food_size" : 550,
"max_steps": 8000,
"resistance_map_type" : None,
class ActiveWalkerModel(Model):
def __init__(self, width : int, height : int,
N_0 : int, # number of initial roamers
N_m : int, # max number of ants
N_r : int, # number of new recruits
alpha : float, #biased random walk
beta : float, # decay rate drop rate
gamma : float, # decay rate pheromone concentration fields
d_s : float, # decay rate sensitvity
d_e : float, # decay rate energy
s_0 : float, # sensitvity reset
e_0 : float, # energy reset
q_0 : float, # initial pheromone level
q_tr : float, # threshold under which ant cannot distinguish concentrations
e_min : float, # energy at which walker dies
def __init__(self, width : int, height : int , num_max_agents : int,
num_initial_roamers : int,
nest_position : Coordinate,
N_f=5, #num food sources
food_size= 55,
) -> None:
self.N_m : int = N_m # max number of ants
self.N_r : int = N_r # number of new recruits
self.alpha : float = alpha # biased random walk if no gradient
self.gamma : float = gamma # decay rate pheromone concentration fields
self.beta : float = beta # decay rate drop rate
self.d_s : float = d_s # decay rate sensitvity
self.d_e : float = d_e # decay rate energy (get's multiplied with resistance)
self.s_0 : float = s_0 # sensitvity reset
self.e_0 : float = e_0 # energy reset
self.q_0 : float = q_0 # pheromone drop rate reset
self.q_tr : float = q_tr # threshold under which ant cannot distinguish concentrations
self.e_min : float = e_min # energy at which walker dies
self.N_f : int = N_f #num food sources
self.successful_ants = 0 # for viviane's graph
self.connectivity = 0 # for viviane's persistence
self.dying_agents = 0
fields=["A", "B", "nests", "food", "res"]
fields=["A", "B", "nests", "food"]
self.schedule = SimultaneousActivation(self)
self.grid = MultiHexGridScalarFields(width=width, height=height, torus=True, fields=fields)
if resistance_map_type is None:
self.grid.fields["res"] = np.ones((width, height)).astype(float)
elif resistance_map_type == "perlin":
# perlin generates anisotropic noise which may or may not be a good choice
# pip3 install git+
from perlin_numpy import (
noise = generate_perlin_noise_2d(shape=(width,height), res=((10,10)))
# normalized to mean=1, min=0, and max=2
normalized_noise = (noise - np.min(noise))/(np.max(noise) - np.min(noise)) * 2
self.grid.fields["res"] = normalized_noise
# possible other noise types: simplex or value
raise NotImplemented(f"{resistance_map_type=} is not implemented.")
self._unique_id_counter = -1
self.max_steps = max_steps
self.num_max_agents = num_max_agents
self.num_new_recruits = 5
self.decay_rates : dict[str, float] = {"A" :0.01,
"B": 0.01,
self.dead_agents = []
for agent_id in self.get_unique_ids(N_0):
if self.schedule.get_agent_count() < self.N_m:
for agent_id in self.get_unique_ids(num_initial_roamers):
if self.schedule.get_agent_count() < self.num_max_agents:
agent = RandomWalkerAnt(unique_id=agent_id, model=self, look_for_pheromone="A", drop_pheromone="A")
self.grid.place_agent(agent, pos=nest_position)
for _ in range(N_f):
for _ in range(num_food_sources):
self.datacollector = DataCollector(
# model_reporters={"agent_dens": lambda m: m.agent_density()},
model_reporters = {"pheromone_a": lambda m: m.grid.fields["A"],
"pheromone_b": lambda m: m.grid.fields["B"],
"alive_ants": lambda m: m.schedule.get_agent_count(),
"dying_ants": lambda m: m.dying_agents,
"successful_walkers": lambda m: m.successful_ants,
"connectivity": lambda m: m.connectivity,
self.datacollector.collect(self) # keep at end of __init___
# Breadth-first-search algorithm for connectivity
def bfs(self):
threshold = 0.5 # half of min sens
connectivity = 0 #initial value of connectivity
connected_food_sources = [] #empty list of connected food sources
visited = [] #empty list of visited (by the algorithm) nodes
nest = np.argwhere(self.grid.fields["nests"] == 1) #get nest location
nest = (nest[0][0], nest[0][1])
start_node = nest #rename
neighbours_to_check = [start_node] #start node gets checked first
neighbours_to_check = neighbours_to_check + self.grid.get_neighborhood(start_node) #start node neighbours get added to the to check list
while neighbours_to_check: #as long as there is something on the to check list
current_node = neighbours_to_check[0] #the first list entry is taken
del neighbours_to_check[0] #and deleted on the to check list
if current_node not in visited: #if it has not previously been checked
if np.max([self.grid.fields["B"][current_node], self.grid.fields["A"][current_node]]) >= threshold: #and its A value is above our threshold
new_neighbors = self.grid.get_neighborhood(current_node) #then we get its neighbours
for new_neighbor in new_neighbors:
if new_neighbor not in visited and new_neighbor not in neighbours_to_check:
neighbours_to_check.append(new_neighbor) #then they are also added to our to check list
visited.append(current_node) #and the current node has now been checked
if self.grid.fields["food"][current_node] > 0: #in case the node we check is food
connectivity += 1 #then we have found a connected path to a food source
connected_food_sources = connected_food_sources + list([current_node]) #and it is added to the list of connected food sources
# why not normalize to 0-1 ?
return connectivity #we want the connectivity (0-5)
def agent_density(self):
a = np.zeros((self.grid.width, self.grid.height))
for i in range(self.grid.width):
@ -195,22 +65,25 @@ class ActiveWalkerModel(Model):
def step(self):
self.dying_agents = 0
self.schedule.step() # step() and advance() all agents
if self.schedule.steps % 100 == 0:
# apply decay rate on pheromone levels
for key in ("A", "B"):
field = self.grid.fields[key]
self.grid.fields[key] = field - self.gamma*field
self.grid.fields[key] = field - self.decay_rates[key]*field
if self.schedule.steps >= self.max_steps:
self.running = False
#remove dead agents
for agent in self.dead_agents:
self.dead_agents = []
#ToDo what happens when all agents die
def get_unique_id(self) -> int:
self._unique_id_counter += 1
for _ in range(num_ids):
yield self.get_unique_id()
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
def is_food(self, pos):
assert('food' in self.fields.keys())
# account for potential float imprecision and use epsilon = 1e-3
return self.fields['food'][pos] > 1e-3
return bool(self.fields['food'][pos])
def add_food(self, size : int , pos=None):
@ -128,7 +127,7 @@ class MultiHexGridScalarFields(MultiHexGrid):
while(self.is_nest(pos) or self.is_food(pos)):
pos = select_random_place()
self.fields['food'][pos] = int(size)
self.fields['food'][pos] = size
def is_nest(self, pos : Coordinate) -> bool:
assert('nests' in self.fields.keys())
import matplotlib.pyplot as plt
import numpy as np
from hexplot import plot_hexagon
def plot_alive_ants_vs_time(model, title=None):
y = model.datacollector.get_model_vars_dataframe()["alive_ants"]
plt.figure(figsize=(10,10), dpi=600)
plt.xlabel("time step")
plt.ylabel("alive agents")
if title is None:
def plot_connectivity_vs_time(model, title=None):
y = model.datacollector.get_model_vars_dataframe()["connectivity"]
plt.figure(figsize=(10,10), dpi=600)
plt.xlabel("time step")
plt.ylabel("No. of food sources connected to the nest")
if title is None:
def dead_ants_vs_time(model, title=None):
y = np.cumsum(model.datacollector.get_model_vars_dataframe()["dying_ants"])
plt.figure(figsize=(10,10), dpi=600)
plt.xlabel("time step")
plt.ylabel("dead agents")
if title is None:
def cum_successful_ants_vs_time(model, title=None):
y = model.datacollector.get_model_vars_dataframe()["successful_walkers"]
plt.figure(figsize=(10,10), dpi=600)
plt.xlabel("time step")
plt.ylabel("cummulative successful agents")
if title is None:
def plot_heatmap(model, low=10, high=200):
for time in np.arange(0, model.max_steps + 1, 1000):
pheromone_concentration = model.datacollector.get_model_vars_dataframe()["pheromone_a"][time]
a = pheromone_concentration
pheromone_concentration = model.datacollector.get_model_vars_dataframe()["pheromone_b"][time]
b = pheromone_concentration
c = np.max([a,b], axis=0)
c = a + b
c = np.clip(c, 1, 1000000000)
c = np.log(c)
c = c/np.max(c)
food_locations = np.nonzero(model.grid.fields['food'])
x_food = [ food[0] for food in food_locations ]
y_food = [ food[1] for food in food_locations ]
plot_hexagon(c, title=f"cummulative pheromone density at timestep {time}")
# Set the model parameters
if params is None:
params = {
"max_steps": 3000,
"width": 50, "height": 50,
"N_m" : 100,
"num_max_agents" : 100,
"nest_position" : (25,25),
"N_0" : 5,
"resistance_map_type": "perlin",
"num_initial_roamers" : 5,
"Color": col,
def portray_resistance_map(model, pos, norm=1):
col = get_color(level=model.grid.fields['res'][pos], normalization=norm)
col = f"rgb({col}, {col}, {col})"
return {
"Shape": "hex",
"r": 1,
"Filled": "true",
"Layer": 0,
"x": pos[0],
"y": pos[1],
"Color": col,
def get_max_grid_val(model, key):
return np.max(model.grid.fields[key])
def portray_pheromone_density(model, pos, norm):
col_a = get_color(level=model.grid.fields["A"][pos], normalization=norm)
col_b = get_color(level=model.grid.fields["B"][pos], normalization=norm)
res_min, res_max = np.min(model.grid.fields['res']), np.max(model.grid.fields['res'])
ease = 1 - model.grid.fields['res'][pos]
col_ease = get_color(level=ease, normalization=np.max(model.grid.fields['res']))
return {
"Shape": "hex",
"r": 1,
@ -116,7 +98,7 @@ def setup(params=None):
"Layer": 0,
"x": pos[0],
"y": pos[1],
"Color": f"rgb({col_a}, {col_b}, {col_ease})"
"Color": f"rgb({col_a}, {col_b}, 255)"
grid_ants = CanvasHexGridMultiAgents(portray_ant_density,
width, height, width*pixel_ratio, height*pixel_ratio,
norm_method=lambda m: 5)
grid_resistance_map = CanvasHexGridMultiAgents(portray_resistance_map,
width, height, width*pixel_ratio, height*pixel_ratio,
norm_method=lambda m: 1)
def norm_ants(model):
return 5
@ -148,19 +127,14 @@ def setup(params=None):
[lambda m: "<h3>Ant density</h3><h5>Nest: Red, Food: Green</h5>",
lambda m: f"<h5>Normalization Value: {norm_ants(m)}</h5>",
lambda m: "<h3>Pheromone Density</h3><h5>Pheromone A: Cyan, Pheromone B: Magenta, Resistance Map: Yellow</h5>",
lambda m: "<h3>Pheromone Density</h3><h5>Pheromone A: Cyan, Pheromone B: Magenta</h5>",
lambda m: f"<h5>Normalization Value: {norm_pheromones(m)}</h5>",
lambda m: f"<h5>Normalization Value: {norm_pheromones(m)}</h5>"
"Active Random Walker Ants", params)
if __name__ == "__main__":
from model import kwargs_paper_setup1
kwargs_paper1_perlin = kwargs_paper_setup1
kwargs_paper1_perlin["height"] = 50
kwargs_paper1_perlin["width"] = 50
kwargs_paper1_perlin["resistance_map_type"] = "perlin"
server = setup(params=kwargs_paper1_perlin)
server = setup()
