Compare commits
1 Commits
master
...
develop_gk
Author | SHA1 | Date | |
---|---|---|---|
|
61e92ae136 |
216
agent.py
216
agent.py
@ -5,24 +5,23 @@ This model implements the actual agents on the grid (a.k.a. the ants)
|
|||||||
|
|
||||||
License: AGPL 3 (see end of file)
|
License: AGPL 3 (see end of file)
|
||||||
(C) Alexander Bocken, Viviane Fahrni, Grace Kagho
|
(C) Alexander Bocken, Viviane Fahrni, Grace Kagho
|
||||||
"""
|
|
||||||
|
|
||||||
"""
|
|
||||||
TO DISCUSS:
|
|
||||||
Is the separation of energy and sensitivity useful? -> only if we have the disconnect via resistance
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import numpy.typing as npt
|
import numpy.typing as npt
|
||||||
from mesa.agent import Agent
|
from mesa.agent import Agent
|
||||||
from mesa.space import Coordinate
|
from mesa.space import Coordinate
|
||||||
|
|
||||||
|
|
||||||
class RandomWalkerAnt(Agent):
|
class RandomWalkerAnt(Agent):
|
||||||
def __init__(self, unique_id, model,
|
def __init__(self, unique_id, model, look_for_pheromone=None,
|
||||||
look_for_pheromone=None,
|
energy_0=1,
|
||||||
drop_pheromone=None,
|
pheromone_drop_rate_0 : dict[str, float]={"A": 80, "B": 80},
|
||||||
sensitivity_max = 10000,
|
sensitivity_0=0.99,
|
||||||
|
alpha=0.6, drop_pheromone=None,
|
||||||
|
betas : dict[str, float]={"A": 0.0512, "B": 0.0512},
|
||||||
|
sensitivity_decay_rate=0.01,
|
||||||
|
sensitivity_max = 300,
|
||||||
|
sensitivity_min = 0.001,
|
||||||
|
sensitivity_steepness = 1
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
super().__init__(unique_id=unique_id, model=model)
|
super().__init__(unique_id=unique_id, model=model)
|
||||||
@ -30,12 +29,21 @@ class RandomWalkerAnt(Agent):
|
|||||||
self._next_pos : None | Coordinate = None
|
self._next_pos : None | Coordinate = None
|
||||||
self._prev_pos : None | Coordinate = None
|
self._prev_pos : None | Coordinate = None
|
||||||
|
|
||||||
self.look_for_pheromone : str|None = look_for_pheromone
|
self.look_for_pheromone = look_for_pheromone
|
||||||
self.drop_pheromone : str|None = drop_pheromone
|
self.drop_pheromone = drop_pheromone
|
||||||
self.energy : float = self.model.e_0
|
self.energy = energy_0 #TODO: use
|
||||||
self.sensitivity : float = self.model.s_0
|
self.sensitivity_0 = sensitivity_0
|
||||||
self.pheromone_drop_rate : float = self.model.q_0
|
self.sensitivity = self.sensitivity_0
|
||||||
|
self.pheromone_drop_rate = pheromone_drop_rate_0
|
||||||
|
self.alpha = alpha
|
||||||
self.sensitivity_max = sensitivity_max
|
self.sensitivity_max = sensitivity_max
|
||||||
|
self.sensitivity_min = sensitivity_min
|
||||||
|
self.sensitivity_decay_rate = sensitivity_decay_rate
|
||||||
|
self.sensitivity_steepness = sensitivity_steepness
|
||||||
|
self.betas = betas
|
||||||
|
self.threshold : dict[str, float] = {"A": 0, "B": 0}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def sens_adj(self, props, key) -> npt.NDArray[np.float_] | float:
|
def sens_adj(self, props, key) -> npt.NDArray[np.float_] | float:
|
||||||
"""
|
"""
|
||||||
@ -57,70 +65,59 @@ class RandomWalkerAnt(Agent):
|
|||||||
|
|
|
|
||||||
0|________
|
0|________
|
||||||
-----------------------> prop
|
-----------------------> prop
|
||||||
|
For the nonlinear sensitivity, the idea is to use a logistic function that has
|
||||||
|
a characteristic sigmoidal shape that starts from a low value, increases rapidly,
|
||||||
|
and then gradually approaches a saturation level.
|
||||||
|
|
||||||
|
f(x) = L / (1 + exp(-k*(x - x0)))
|
||||||
|
|
||||||
|
f(x) = return value
|
||||||
|
L = sens_max
|
||||||
|
k is a parameter that controls the steepness of the curve. We can start with 1
|
||||||
|
A higher value of k leads to a steeper curve.
|
||||||
|
x0 is the midpoint of the curve, where the sensitivity starts to increase significantly.
|
||||||
|
We can make X0 the threshold value
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
# if props iterable create array, otherwise return single value
|
# if props iterable create array, otherwise return single value
|
||||||
try:
|
try:
|
||||||
iter(props)
|
iter(props)
|
||||||
except TypeError:
|
except TypeError:
|
||||||
#TODO: proper nonlinear response, not just clamping
|
#TODO: proper nonlinear response, not just clamping
|
||||||
|
|
||||||
|
non_linear_sens = True
|
||||||
|
if non_linear_sens:
|
||||||
|
L = self.sensitivity_max
|
||||||
|
k = self.sensitivity_steepness
|
||||||
|
mid = self.threshold[key]
|
||||||
if props > self.sensitivity_max:
|
if props > self.sensitivity_max:
|
||||||
return self.sensitivity_max
|
return self.sensitivity_max
|
||||||
if props > self.model.q_tr:
|
|
||||||
|
#Should we still keep these conditions?
|
||||||
|
# if props > self.threshold[key]:
|
||||||
|
# return props
|
||||||
|
else:
|
||||||
|
adjusted_sensitivity = L / (1 + np.exp(-k * (props - mid)))
|
||||||
|
print(f'props: {props}, adjusted_value: {adjusted_sensitivity}')
|
||||||
|
return adjusted_sensitivity
|
||||||
|
else:
|
||||||
|
if props > self.sensitivity_max:
|
||||||
|
return self.sensitivity_max #Should we still keep these conditions
|
||||||
|
if props > self.threshold[key]:
|
||||||
return props
|
return props
|
||||||
else:
|
else:
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
|
|
||||||
arr : list[float] = []
|
arr : list[float] = []
|
||||||
for prop in props:
|
for prop in props:
|
||||||
arr.append(self.sens_adj(prop, key))
|
arr.append(self.sens_adj(prop, key))
|
||||||
return np.array(arr)
|
return np.array(arr)
|
||||||
|
|
||||||
def _get_resistance_weights(self, positions=None):
|
|
||||||
if positions is None:
|
|
||||||
positions = self.neighbors()
|
|
||||||
# bit round-about but self.model.grid.fields['res'][positions]
|
|
||||||
# gets interpreted as slices, not multiple singular positions
|
|
||||||
resistance = np.array([ self.model.grid.fields['res'][x,y] for x,y in positions ])
|
|
||||||
easiness = np.max(self.model.grid.fields['res']) - resistance + np.min(self.model.grid.fields['res']) + 1e-15 # + epsilon to not divide by zero
|
|
||||||
weights = easiness/ np.sum(easiness)
|
|
||||||
#inv_weights = resistance/ np.sum(resistance)
|
|
||||||
#weights = 1 - inv_weights
|
|
||||||
#weights /= np.sum(weights)
|
|
||||||
return weights
|
|
||||||
|
|
||||||
def _choose_next_pos(self):
|
def _choose_next_pos(self):
|
||||||
def _combine_weights(res_weights, walk_weights):
|
|
||||||
"""
|
|
||||||
If we have a resistance -> Infinity we want to have a likelihood -> 0 for this direction
|
|
||||||
Therefore we should multiply our two probabilities.
|
|
||||||
For the case of no resistance field this will return the normal walk_weights
|
|
||||||
res_weights : resistance weights: based on resistance field of neighbours
|
|
||||||
see _get_resistance_weights for more info
|
|
||||||
walk weights: In case of biased random walk (no positive pheromone gradient):
|
|
||||||
forward: alpha,
|
|
||||||
everywhere else: (1- alpaha)/5)
|
|
||||||
In case of positive pheromone gradient present in front:
|
|
||||||
max. positive gradient: self.sensitivity
|
|
||||||
everyhwere else: (1-self.sensitivity)/5
|
|
||||||
"""
|
|
||||||
combined = res_weights * walk_weights
|
|
||||||
normalized = combined / np.sum(combined)
|
|
||||||
return normalized
|
|
||||||
|
|
||||||
def _pick_from_remaining_five(remaining_five):
|
|
||||||
"""
|
|
||||||
"""
|
|
||||||
weights = self._get_resistance_weights(remaining_five)
|
|
||||||
random_index = np.random.choice(range(len(remaining_five)), p=weights)
|
|
||||||
self._next_pos = remaining_five[random_index]
|
|
||||||
self._prev_pos = self.pos
|
|
||||||
|
|
||||||
if self._prev_pos is None:
|
if self._prev_pos is None:
|
||||||
res_weights = self._get_resistance_weights()
|
i = np.random.choice(range(6))
|
||||||
walk_weights = np.ones(6)
|
|
||||||
weights = _combine_weights(res_weights, walk_weights)
|
|
||||||
|
|
||||||
i = np.random.choice(range(6),p=weights)
|
|
||||||
assert(self.pos is not self.neighbors()[i])
|
assert(self.pos is not self.neighbors()[i])
|
||||||
self._next_pos = self.neighbors()[i]
|
self._next_pos = self.neighbors()[i]
|
||||||
self._prev_pos = self.pos
|
self._prev_pos = self.pos
|
||||||
@ -129,115 +126,80 @@ class RandomWalkerAnt(Agent):
|
|||||||
if self.searching_food:
|
if self.searching_food:
|
||||||
for neighbor in self.front_neighbors:
|
for neighbor in self.front_neighbors:
|
||||||
if self.model.grid.is_food(neighbor):
|
if self.model.grid.is_food(neighbor):
|
||||||
self.model.grid.fields['food'][neighbor] -= 1 # eat
|
|
||||||
#resets
|
|
||||||
self.pheromone_drop_rate = self.model.q_0
|
|
||||||
self.sensitivity = self.model.s_0
|
|
||||||
self.energy = self.model.e_0
|
|
||||||
|
|
||||||
#now look for other pheromone
|
|
||||||
self.look_for_pheromone = "A"
|
|
||||||
self.drop_pheromone = "B"
|
self.drop_pheromone = "B"
|
||||||
|
self.look_for_pheromone = "A"
|
||||||
|
self.sensitivity = self.sensitivity_0
|
||||||
|
|
||||||
self._prev_pos = neighbor
|
self._prev_pos = neighbor
|
||||||
self._next_pos = self.pos
|
self._next_pos = self.pos
|
||||||
return
|
|
||||||
|
|
||||||
elif self.searching_nest:
|
elif self.searching_nest:
|
||||||
for neighbor in self.front_neighbors:
|
for neighbor in self.front_neighbors:
|
||||||
if self.model.grid.is_nest(neighbor):
|
if self.model.grid.is_nest(neighbor):
|
||||||
#resets
|
self.look_for_pheromone = "A" # Is this a correct interpretation?
|
||||||
self.pheromone_drop_rate = self.model.q_0
|
|
||||||
self.sensitivity = self.model.s_0
|
|
||||||
self.energy = self.model.e_0
|
|
||||||
|
|
||||||
self.look_for_pheromone = "B"
|
|
||||||
self.drop_pheromone = "A"
|
self.drop_pheromone = "A"
|
||||||
|
self.sensitivity = self.sensitivity_0
|
||||||
|
|
||||||
self._prev_pos = neighbor
|
self._prev_pos = neighbor
|
||||||
self._next_pos = self.pos
|
self._next_pos = self.pos
|
||||||
self.model.successful_ants += 1
|
|
||||||
|
|
||||||
|
|
||||||
# recruit new ants
|
# recruit new ants
|
||||||
print("RECRUITING")
|
for agent_id in self.model.get_unique_ids(self.model.num_new_recruits):
|
||||||
for agent_id in self.model.get_unique_ids(self.model.N_r):
|
if self.model.schedule.get_agent_count() < self.model.num_max_agents:
|
||||||
if self.model.schedule.get_agent_count() < self.model.N_m:
|
|
||||||
agent = RandomWalkerAnt(unique_id=agent_id, model=self.model, look_for_pheromone="B", drop_pheromone="A")
|
agent = RandomWalkerAnt(unique_id=agent_id, model=self.model, look_for_pheromone="B", drop_pheromone="A")
|
||||||
agent._next_pos = self.pos
|
agent._next_pos = self.pos
|
||||||
self.model.schedule.add(agent)
|
self.model.schedule.add(agent)
|
||||||
self.model.grid.place_agent(agent, pos=neighbor)
|
self.model.grid.place_agent(agent, pos=neighbor)
|
||||||
return
|
|
||||||
|
|
||||||
# follow positive gradient with likelihood self.sensitivity
|
# follow positive gradient
|
||||||
if self.look_for_pheromone is not None:
|
if self.look_for_pheromone is not None:
|
||||||
# Calculate gradient
|
|
||||||
front_concentration = [self.model.grid.fields[self.look_for_pheromone][cell] for cell in self.front_neighbors ]
|
front_concentration = [self.model.grid.fields[self.look_for_pheromone][cell] for cell in self.front_neighbors ]
|
||||||
front_concentration = self.sens_adj(front_concentration, self.look_for_pheromone)
|
front_concentration = self.sens_adj(front_concentration, self.look_for_pheromone)
|
||||||
current_pos_concentration = self.sens_adj(self.model.grid.fields[self.look_for_pheromone][self.pos], self.look_for_pheromone)
|
current_pos_concentration = self.sens_adj(self.model.grid.fields[self.look_for_pheromone][self.pos], self.look_for_pheromone)
|
||||||
gradient = front_concentration - np.repeat(current_pos_concentration, 3).astype(np.float_)
|
gradient = front_concentration - np.repeat(current_pos_concentration, 3).astype(np.float_)
|
||||||
|
# TODO: if two or more neighbors have same concentration randomize? Should be unlikely with floats though
|
||||||
index = np.argmax(gradient)
|
index = np.argmax(gradient)
|
||||||
if gradient[index] > 0:
|
if gradient[index] > 0:
|
||||||
# follow positive gradient with likelihood self.sensitivity * resistance_weight (re-normalized)
|
self._next_pos = self.front_neighbors[index]
|
||||||
|
|
||||||
all_neighbors_cells = self.neighbors()
|
|
||||||
highest_gradient_cell = self.front_neighbors[index]
|
|
||||||
highest_gradient_index_arr = np.where(all_neighbors_cells == highest_gradient_cell)
|
|
||||||
assert(len(highest_gradient_index_arr) == 1)
|
|
||||||
|
|
||||||
all_neighbors_index = highest_gradient_index_arr[0]
|
|
||||||
sens_weights = np.ones(6) * (1-self.sensitivity)/5
|
|
||||||
sens_weights[all_neighbors_index] = self.sensitivity
|
|
||||||
|
|
||||||
res_weights = self._get_resistance_weights()
|
|
||||||
weights = _combine_weights(res_weights, sens_weights)
|
|
||||||
|
|
||||||
random_index = np.random.choice(range(6), p=weights)
|
|
||||||
self._next_pos = all_neighbors_cells[random_index]
|
|
||||||
|
|
||||||
self._prev_pos = self.pos
|
self._prev_pos = self.pos
|
||||||
return
|
return
|
||||||
|
|
||||||
# do biased random walk
|
# do biased random walk
|
||||||
all_neighbors_cells = self.neighbors()
|
p = np.random.uniform()
|
||||||
front_index_arr = np.where(all_neighbors_cells == self.front_neighbor)
|
if p < self.alpha:
|
||||||
assert(len(front_index_arr) == 1 )
|
self._next_pos = self.front_neighbor
|
||||||
front_index = front_index_arr[0]
|
self._prev_pos = self.pos
|
||||||
|
else:
|
||||||
res_weights = self._get_resistance_weights()
|
# need copy() as we would otherwise remove the tuple from all possible lists (aka python "magic")
|
||||||
walk_weights = np.ones(6) * (1-self.model.alpha) / 5
|
other_neighbors = self.neighbors().copy()
|
||||||
walk_weights[front_index] = self.model.alpha
|
other_neighbors.remove(self.front_neighbor)
|
||||||
|
random_index = np.random.choice(range(len(other_neighbors)))
|
||||||
weights = _combine_weights(res_weights, walk_weights)
|
self._next_pos = other_neighbors[random_index]
|
||||||
|
|
||||||
random_index = np.random.choice(range(6), p=weights)
|
|
||||||
self._next_pos = all_neighbors_cells[random_index]
|
|
||||||
self._prev_pos = self.pos
|
self._prev_pos = self.pos
|
||||||
|
|
||||||
def step(self):
|
|
||||||
# Die and get removed if no energy
|
|
||||||
if self.energy < self.model.e_min:
|
|
||||||
self.model.schedule.remove(self)
|
|
||||||
#update list of dead agents for time step
|
|
||||||
self.model.dying_agents += 1
|
|
||||||
|
|
||||||
else:
|
def step(self):
|
||||||
|
self.sensitivity -= self.sensitivity_decay_rate
|
||||||
self._choose_next_pos()
|
self._choose_next_pos()
|
||||||
self._adjust_pheromone_drop_rate()
|
self._adjust_pheromone_drop_rate()
|
||||||
|
|
||||||
self.sensitivity -= self.model.d_s
|
#kill agent if sensitivity is low
|
||||||
self.energy -= self.model.grid.fields['res'][self.pos] * self.model.d_e
|
if self.sensitivity < self.sensitivity_min:
|
||||||
|
self._kill_agent()
|
||||||
|
|
||||||
def _adjust_pheromone_drop_rate(self):
|
def _adjust_pheromone_drop_rate(self):
|
||||||
if(self.drop_pheromone is not None):
|
if(self.drop_pheromone is not None):
|
||||||
self.pheromone_drop_rate -= self.pheromone_drop_rate * self.model.beta
|
self.pheromone_drop_rate[self.drop_pheromone] -= self.pheromone_drop_rate[self.drop_pheromone] * self.betas[self.drop_pheromone]
|
||||||
|
|
||||||
def drop_pheromones(self) -> None:
|
def drop_pheromones(self) -> None:
|
||||||
# should only be called in advance() as we do not use hidden fields
|
# should only be called in advance() as we do not use hidden fields
|
||||||
if self.drop_pheromone is not None:
|
if self.drop_pheromone is not None:
|
||||||
self.model.grid.fields[self.drop_pheromone][self.pos] += self.pheromone_drop_rate
|
self.model.grid.fields[self.drop_pheromone][self.pos] += self.pheromone_drop_rate[self.drop_pheromone]
|
||||||
|
|
||||||
|
def _kill_agent(self):
|
||||||
|
#update dead_agent list
|
||||||
|
self.model.dead_agents.append(self)
|
||||||
|
|
||||||
|
|
||||||
def advance(self) -> None:
|
def advance(self) -> None:
|
||||||
self.drop_pheromones()
|
self.drop_pheromones()
|
||||||
|
37
hexplot.py
37
hexplot.py
@ -1,37 +0,0 @@
|
|||||||
#!/bin/python
|
|
||||||
import numpy as np
|
|
||||||
import matplotlib.pyplot as plt
|
|
||||||
|
|
||||||
def plot_hexagon(A, title=None, block=True):
|
|
||||||
X, Y = np.meshgrid(range(A.shape[0]), range(A.shape[-1]))
|
|
||||||
X, Y = X*2, Y*2
|
|
||||||
|
|
||||||
# Turn this into a hexagonal grid
|
|
||||||
for i, k in enumerate(X):
|
|
||||||
if i % 2 == 1:
|
|
||||||
X[i] += 1
|
|
||||||
Y[:,i] += 1
|
|
||||||
|
|
||||||
fig, ax = plt.subplots()
|
|
||||||
fig.set_figwidth(10)
|
|
||||||
fig.set_figheight(10)
|
|
||||||
fig.set_dpi(600)
|
|
||||||
im = ax.hexbin(
|
|
||||||
X.reshape(-1),
|
|
||||||
Y.reshape(-1),
|
|
||||||
C=A.reshape(-1),
|
|
||||||
gridsize=int(A.shape[0]/2)
|
|
||||||
)
|
|
||||||
|
|
||||||
# the rest of the code is adjustable for best output
|
|
||||||
ax.set_aspect(1)
|
|
||||||
ax.set(xlim=(-4, X.max()+4,), ylim=(-4, Y.max()+4))
|
|
||||||
ax.axis(False)
|
|
||||||
ax.set_xmargin(0)
|
|
||||||
ax.set_ymargin(0)
|
|
||||||
#plt.colorbar(im, shrink=0.7)
|
|
||||||
|
|
||||||
if(title is not None):
|
|
||||||
pass
|
|
||||||
#plt.title(title)
|
|
||||||
plt.savefig(f"{title}.png")
|
|
286
main.py
286
main.py
@ -7,12 +7,19 @@ License: AGPL 3 (see end of file)
|
|||||||
(C) Alexander Bocken, Viviane Fahrni, Grace Kagho
|
(C) Alexander Bocken, Viviane Fahrni, Grace Kagho
|
||||||
"""
|
"""
|
||||||
from model import ActiveWalkerModel
|
from model import ActiveWalkerModel
|
||||||
|
from agent import RandomWalkerAnt
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import matplotlib.pyplot as plt
|
import matplotlib.pyplot as plt
|
||||||
from mesa.space import Coordinate
|
from mesa.space import Coordinate
|
||||||
from mesa.datacollection import DataCollector
|
from mesa.datacollection import DataCollector
|
||||||
|
|
||||||
#from multihex import MultiHexGrid
|
from multihex import MultiHexGrid
|
||||||
|
|
||||||
|
def main():
|
||||||
|
check_pheromone_exponential_decay()
|
||||||
|
check_ant_sensitivity_linear_decay()
|
||||||
|
check_ant_pheromone_exponential_decay()
|
||||||
|
check_ants_follow_gradient()
|
||||||
|
|
||||||
def check_pheromone_exponential_decay():
|
def check_pheromone_exponential_decay():
|
||||||
"""
|
"""
|
||||||
@ -100,7 +107,6 @@ def check_ant_pheromone_exponential_decay():
|
|||||||
num_initial_roamers = 1
|
num_initial_roamers = 1
|
||||||
num_max_agents = 100
|
num_max_agents = 100
|
||||||
nest_position : Coordinate = (width //2, height //2)
|
nest_position : Coordinate = (width //2, height //2)
|
||||||
num_food_sources = 0;
|
|
||||||
max_steps = 1000
|
max_steps = 1000
|
||||||
|
|
||||||
model = ActiveWalkerModel(width=width, height=height,
|
model = ActiveWalkerModel(width=width, height=height,
|
||||||
@ -172,284 +178,12 @@ def check_ants_follow_gradient():
|
|||||||
print(20*"#")
|
print(20*"#")
|
||||||
model.step()
|
model.step()
|
||||||
|
|
||||||
def viviane_bfs_example_run():
|
|
||||||
# Breadth-first-search algorithm for connectivity
|
|
||||||
def bfs(graph, start_node, threshold): #graph=grid, start_node=nest, threshold=TBD?
|
|
||||||
from collections import deque
|
|
||||||
visited = set()
|
|
||||||
queue = deque([(start_node, [])])
|
|
||||||
paths = {}
|
|
||||||
connected_food_sources = set()
|
|
||||||
|
|
||||||
while queue:
|
if __name__ == "__main__":
|
||||||
current_node, path = queue.popleft()
|
main()
|
||||||
#current_node = tuple(current_node)
|
|
||||||
visited.add(current_node)
|
|
||||||
|
|
||||||
if current_node in graph:
|
|
||||||
for neighbor, m.grid.fields["A"] in graph[current_node].items():
|
|
||||||
if neighbor not in visited and m.grid.fields["A"] >= threshold:
|
|
||||||
new_path = path + [neighbor]
|
|
||||||
queue.append((neighbor, new_path))
|
|
||||||
|
|
||||||
# Check if the neighbor is a food source
|
|
||||||
if neighbor in self.grid_food:
|
|
||||||
if neighbor not in paths:
|
|
||||||
paths[neighbor] = new_path
|
|
||||||
connected_food_sources.add(neighbor)
|
|
||||||
|
|
||||||
connectivity = len(connected_food_sources)
|
|
||||||
|
|
||||||
return connectivity
|
|
||||||
|
|
||||||
|
|
||||||
# Calculate connectivity through BFS
|
|
||||||
|
|
||||||
current_paths = bfs(self.grid, self.grid.fields["nests"], 0.000001)
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
N = 121
|
|
||||||
N_X = int(np.sqrt(N))
|
|
||||||
N_Y = N // N_X
|
|
||||||
# fancy way of saying absolutely nothing but 11
|
|
||||||
|
|
||||||
xv, yv = np.meshgrid(np.arange(N_X), np.arange(N_Y), sparse=False, indexing='xy')
|
|
||||||
|
|
||||||
|
|
||||||
print(f"{N_X=}")
|
|
||||||
|
|
||||||
print(f"{N_Y=}")
|
|
||||||
|
|
||||||
print(f"{(xv, yv)=}")
|
|
||||||
|
|
||||||
print(f"{xv=}")
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def fixed_distance_tests():
|
|
||||||
"""
|
|
||||||
position a target food source a known distance away from nest
|
|
||||||
check for no. successful ants for n runs
|
|
||||||
"""
|
|
||||||
|
|
||||||
from tqdm import tqdm
|
|
||||||
runs = 10
|
|
||||||
from model import kwargs_paper_setup1 as kwargs
|
|
||||||
kwargs["N_f"] = 0
|
|
||||||
kwargs["gamma"] /= 2 # field decays three times slower
|
|
||||||
kwargs["beta"] /= 2 # drop rates decays three times slower
|
|
||||||
kwargs["d_s"] /= 2 # drop rates decays three times slower
|
|
||||||
kwargs["d_e"] /= 2 # drop rates decays three times slower
|
|
||||||
successful_walkers = {}
|
|
||||||
for distance in tqdm(range(5,30), position=0, desc="dis"):
|
|
||||||
successful_walkers[distance] = []
|
|
||||||
for _ in tqdm(range(runs), position=1, desc="run", leave=False):
|
|
||||||
model = ActiveWalkerModel(**kwargs)
|
|
||||||
nest_location = kwargs["nest_position"]
|
|
||||||
food_location = (nest_location[0] - distance, nest_location[1])
|
|
||||||
model.grid.add_food(size=100, pos=food_location)
|
|
||||||
for _ in tqdm(range(model.max_steps), position=2, desc="step", leave=False):
|
|
||||||
model.step()
|
|
||||||
successful_walkers[distance].append(model.datacollector.get_model_vars_dataframe().reset_index()["successful_walkers"][kwargs["max_steps"]])
|
|
||||||
return successful_walkers
|
|
||||||
|
|
||||||
def fixed_distance_object_between():
|
|
||||||
"""
|
|
||||||
diameter of object: floor(50% of distance)
|
|
||||||
"""
|
|
||||||
|
|
||||||
from tqdm import tqdm
|
|
||||||
runs = 10
|
|
||||||
from model import kwargs_paper_setup1 as kwargs
|
|
||||||
kwargs["N_f"] = 0
|
|
||||||
kwargs["gamma"] /= 2 # field decays slower
|
|
||||||
kwargs["beta"] /= 2 # drop rates decays slower
|
|
||||||
kwargs["d_e"] /= 2 # live longer, search longer
|
|
||||||
kwargs["d_s"] /= 2 # live longer, search longer
|
|
||||||
successful_walkers = {}
|
|
||||||
for distance in tqdm(range(5,30), position=0, desc="dis"):
|
|
||||||
successful_walkers[distance] = []
|
|
||||||
for _ in tqdm(range(runs), position=1, desc="run", leave=False):
|
|
||||||
model = ActiveWalkerModel(**kwargs)
|
|
||||||
nest_location = kwargs["nest_position"]
|
|
||||||
food_location = (nest_location[0] - distance, nest_location[1])
|
|
||||||
object_location = (nest_location[0] - distance//2, nest_location[1])
|
|
||||||
place_blocking_object(object_location, radius=distance//4, model=model)
|
|
||||||
model.grid.add_food(size=100, pos=food_location)
|
|
||||||
for _ in tqdm(range(model.max_steps), position=2, desc="step", leave=False):
|
|
||||||
model.step()
|
|
||||||
successful_walkers[distance].append(model.datacollector.get_model_vars_dataframe().reset_index()["successful_walkers"][kwargs["max_steps"]])
|
|
||||||
return successful_walkers
|
|
||||||
|
|
||||||
def place_blocking_object(center, radius, model):
|
|
||||||
positions = [center]
|
|
||||||
next_outside = [center]
|
|
||||||
# We grow from the center and add all neighbours of the outer edge of our blocking object
|
|
||||||
# Add all neighbours of next_outside that aren't in positions to the object
|
|
||||||
# by doing this radius times we should get an object of diameter 2 * radius + 1
|
|
||||||
# positions: accumulator for all positions inside the object of radius radius
|
|
||||||
# next_outside: keep track what we added in the last go-around. These will be used in the next step.
|
|
||||||
for _ in range(radius):
|
|
||||||
outside = next_outside
|
|
||||||
next_oustide = []
|
|
||||||
|
|
||||||
#otherwise interprets the tuple as something stupid
|
|
||||||
for i in range(len(outside)):
|
|
||||||
cell = outside[i]
|
|
||||||
neighbours = model.grid.get_neighborhood(cell)
|
|
||||||
for n in neighbours:
|
|
||||||
if n not in positions:
|
|
||||||
positions.append(n)
|
|
||||||
next_outside.append(n)
|
|
||||||
|
|
||||||
# some large number in comparison to the rest of the resistance field
|
|
||||||
# such that the probability of stepping on these grid spots tend towards zero
|
|
||||||
infinity = 1e20
|
|
||||||
for pos in positions:
|
|
||||||
model.grid.fields['res'][pos] = infinity
|
|
||||||
|
|
||||||
|
|
||||||
def run_model():
|
|
||||||
from tqdm import tqdm
|
|
||||||
# nests rather far away but also partially clumped.
|
|
||||||
np.random.seed(6)
|
|
||||||
|
|
||||||
from model import kwargs_paper_setup1 as kwargs
|
|
||||||
kwargs["gamma"] /= 2
|
|
||||||
kwargs["beta"] /= 2
|
|
||||||
kwargs["d_e"] /= 5 # live longer, search longer
|
|
||||||
kwargs["d_s"] /= 5 # live longer, search longer
|
|
||||||
kwargs["N_0"] *= 2 # more initial roamers/scouts
|
|
||||||
kwargs["max_steps"] *= 2 # more initial roamers/scouts
|
|
||||||
|
|
||||||
model = ActiveWalkerModel(**kwargs)
|
|
||||||
a = np.zeros_like(model.grid.fields['food'])
|
|
||||||
a[np.nonzero(model.grid.fields['food'])] = 1
|
|
||||||
a[np.nonzero(model.grid.fields['nests'])] = -1
|
|
||||||
for _ in tqdm(range(model.max_steps)):
|
|
||||||
model.step()
|
|
||||||
return model
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
from model import kwargs_paper_setup1 as kwargs
|
|
||||||
kwargs["gamma"] /= 2
|
|
||||||
kwargs["beta"] /= 2
|
|
||||||
kwargs["d_e"] /= 5 # live longer, search longer
|
|
||||||
kwargs["d_s"] /= 5 # live longer, search longer
|
|
||||||
kwargs["N_0"] *= 2 # more initial roamers/scouts
|
|
||||||
kwargs["max_steps"] *= 2 # more initial roamers/scouts
|
|
||||||
|
|
||||||
def run_model_objects(step, seed=None, title=None):
|
|
||||||
from tqdm import tqdm
|
|
||||||
# nests rather far away but also partially clumped.
|
|
||||||
np.random.seed(6)
|
|
||||||
from hexplot import plot_hexagon
|
|
||||||
model = ActiveWalkerModel(**kwargs)
|
|
||||||
a = np.zeros_like(model.grid.fields['food'])
|
|
||||||
a[np.nonzero(model.grid.fields['food'])] = 1
|
|
||||||
a[np.nonzero(model.grid.fields['nests'])] = -1
|
|
||||||
for current_step in tqdm(range(model.max_steps)):
|
|
||||||
if current_step == step:
|
|
||||||
if seed is not None:
|
|
||||||
np.random.seed(seed)
|
|
||||||
for _ in range(10):
|
|
||||||
coord = np.random.randint(0, 100, size=2)
|
|
||||||
coord = (coord[0], coord[1])
|
|
||||||
place_blocking_object(center=coord,radius=5, model=model)
|
|
||||||
a = model.grid.fields["res"]
|
|
||||||
if title is not None:
|
|
||||||
plot_hexagon(a, title=title)
|
|
||||||
model.step()
|
|
||||||
return model
|
|
||||||
|
|
||||||
#if __name__ == "__main__":
|
|
||||||
#plot_heatmap()
|
|
||||||
#res = run_model_no_objects()
|
|
||||||
for i in range(10):
|
|
||||||
res = run_model_objects(step=6000, seed=i+100, title=f"objects/blockings_run_{i}")
|
|
||||||
from plot import plot_alive_ants_vs_time, dead_ants_vs_time, plot_connectivity_vs_time
|
|
||||||
plot_alive_ants_vs_time(res, title=f"objects/run_{i}")
|
|
||||||
dead_ants_vs_time(res, title=f"objects/dead_ants_run_{i}")
|
|
||||||
plot_connectivity_vs_time(res, title=f"objects/conn_run_{i}")
|
|
||||||
|
|
||||||
|
|
||||||
#print("DISTANCE TEST VS SUCCESSFUL ANTS OBJECT INBETWEEN")
|
|
||||||
#res = fixed_distance_tests()
|
|
||||||
#res = fixed_distance_object_between()
|
|
||||||
# print("Test")
|
|
||||||
#from model import kwargs_paper_setup1 as kwargs
|
|
||||||
#kwargs["resistance_map_type"] = "perlin"
|
|
||||||
# print(kwargs)
|
|
||||||
#model = ActiveWalkerModel(**kwargs)
|
|
||||||
#model.step()
|
|
||||||
|
|
||||||
# a = np.zeros_like(model.grid.fields['food'])
|
|
||||||
# a[np.nonzero(model.grid.fields['food'])] = 1
|
|
||||||
# plot_hexagon(a, title="Nest locations")
|
|
||||||
# plot_hexagon(model.grid.fields['res'], title="Resistance Map")
|
|
||||||
|
|
||||||
|
|
||||||
# from tqdm import tqdm as progress_bar
|
|
||||||
# for _ in progress_bar(range(model.max_steps)):
|
|
||||||
# model.step()
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Access the DataCollector
|
|
||||||
#datacollector = model.datacollector
|
|
||||||
## Get the data from the DataCollector
|
|
||||||
#model_data = datacollector.get_model_vars_dataframe()
|
|
||||||
#print(model_data.columns)
|
|
||||||
|
|
||||||
## Plot the number of alive ants over time
|
|
||||||
#plt.plot(model_data.index, model_data['alive_ants'])
|
|
||||||
#plt.xlabel('Time')
|
|
||||||
#plt.ylabel('Number of Alive Ants') #this should probably be "active" ants, since it is not considering those in the nest
|
|
||||||
#plt.title('Number of Alive Ants Over Time')
|
|
||||||
#plt.grid(True)
|
|
||||||
#plt.show()
|
|
||||||
|
|
||||||
## Plot the number of sucessful walkers over time
|
|
||||||
#plt.plot(model_data.index, model_data['sucessful_walkers'])
|
|
||||||
#plt.xlabel('Time')
|
|
||||||
#plt.ylabel('Number of Sucessful Walkers')
|
|
||||||
#plt.title('Number of Sucessful Walkers Over Time')
|
|
||||||
#plt.grid(True)
|
|
||||||
#plt.show()
|
|
||||||
|
|
||||||
|
|
||||||
## Calculate the cumulative sum
|
|
||||||
#model_data['cumulative_sucessful_walkers'] = model_data['sucessful_walkers'].cumsum()
|
|
||||||
|
|
||||||
## Plot the cumulative sum of sucessful walkers over time
|
|
||||||
#plt.plot(model_data.index, model_data['cumulative_sucessful_walkers'])
|
|
||||||
#plt.xlabel('Time')
|
|
||||||
#plt.ylabel('Cumulative Sucessful Walkers')
|
|
||||||
#plt.title('Cumulative Sucessful Walkers Over Time')
|
|
||||||
#plt.grid(True)
|
|
||||||
#plt.show()
|
|
||||||
|
|
||||||
## Values over 100 are to be interpreted as walkers being sucessfull several times since the total max number of ants is 100
|
|
||||||
|
|
||||||
# # Connectivity measure
|
|
||||||
#def check_food_source_connectivity(food_sources, paths): #food_sources = nodes.is_nest, paths=result from BFS
|
|
||||||
# connected_food_sources = set()
|
|
||||||
|
|
||||||
# for source in food_sources:
|
|
||||||
# if source in paths:
|
|
||||||
# connected_food_sources.add(source)
|
|
||||||
|
|
||||||
# connectivity = len(connected_food_sources)
|
|
||||||
|
|
||||||
|
|
||||||
# return connectivity
|
|
||||||
|
|
||||||
|
|
||||||
# # Calculate connectivity through BFS
|
|
||||||
|
|
||||||
# current_paths = bfs(self.grid, self.grid.fields["nests"], 0.000001)
|
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
|
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
|
||||||
|
|
||||||
|
178
model.py
178
model.py
@ -16,176 +16,46 @@ from mesa.time import SimultaneousActivation
|
|||||||
from mesa.datacollection import DataCollector
|
from mesa.datacollection import DataCollector
|
||||||
from agent import RandomWalkerAnt
|
from agent import RandomWalkerAnt
|
||||||
|
|
||||||
kwargs_paper_setup1 = {
|
|
||||||
"width": 100,
|
|
||||||
"height": 100,
|
|
||||||
"N_0": 20,
|
|
||||||
"N_m": 100,
|
|
||||||
"N_r": 5,
|
|
||||||
"alpha": 0.6,
|
|
||||||
"gamma": 0.001,
|
|
||||||
"beta": 0.0512,
|
|
||||||
"d_s": 0.001,
|
|
||||||
"d_e": 0.001,
|
|
||||||
"s_0": 0.99,
|
|
||||||
"e_0": 0.99,
|
|
||||||
"q_0": 80,
|
|
||||||
"q_tr": 1,
|
|
||||||
"e_min": 0,
|
|
||||||
"nest_position": (49,49),
|
|
||||||
"N_f": 5,
|
|
||||||
"food_size" : 55,
|
|
||||||
"max_steps": 8000,
|
|
||||||
"resistance_map_type" : None,
|
|
||||||
}
|
|
||||||
|
|
||||||
kwargs_paper_setup2 = {
|
|
||||||
"width": 100,
|
|
||||||
"height": 100,
|
|
||||||
"N_0": 20,
|
|
||||||
"N_m": 100,
|
|
||||||
"N_r": 5,
|
|
||||||
"alpha": 0.6,
|
|
||||||
"gamma": 0.01,
|
|
||||||
"beta": 0.0512,
|
|
||||||
"d_s": 0.001,
|
|
||||||
"d_e": 0.001,
|
|
||||||
"s_0": 0.99,
|
|
||||||
"e_0": 0.99,
|
|
||||||
"q_0": 80,
|
|
||||||
"q_tr": 1,
|
|
||||||
"e_min": 0,
|
|
||||||
"nest_position": (49,49),
|
|
||||||
"N_f": 5,
|
|
||||||
"food_size" : 550,
|
|
||||||
"max_steps": 8000,
|
|
||||||
"resistance_map_type" : None,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class ActiveWalkerModel(Model):
|
class ActiveWalkerModel(Model):
|
||||||
def __init__(self, width : int, height : int,
|
def __init__(self, width : int, height : int , num_max_agents : int,
|
||||||
N_0 : int, # number of initial roamers
|
num_initial_roamers : int,
|
||||||
N_m : int, # max number of ants
|
|
||||||
N_r : int, # number of new recruits
|
|
||||||
alpha : float, #biased random walk
|
|
||||||
beta : float, # decay rate drop rate
|
|
||||||
gamma : float, # decay rate pheromone concentration fields
|
|
||||||
d_s : float, # decay rate sensitvity
|
|
||||||
d_e : float, # decay rate energy
|
|
||||||
s_0 : float, # sensitvity reset
|
|
||||||
e_0 : float, # energy reset
|
|
||||||
q_0 : float, # initial pheromone level
|
|
||||||
q_tr : float, # threshold under which ant cannot distinguish concentrations
|
|
||||||
e_min : float, # energy at which walker dies
|
|
||||||
nest_position : Coordinate,
|
nest_position : Coordinate,
|
||||||
N_f=5, #num food sources
|
num_food_sources=5,
|
||||||
food_size= 55,
|
food_size=10,
|
||||||
max_steps:int=1000,
|
max_steps:int=1000,
|
||||||
resistance_map_type=None,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
|
fields=["A", "B", "nests", "food"]
|
||||||
self.N_m : int = N_m # max number of ants
|
|
||||||
self.N_r : int = N_r # number of new recruits
|
|
||||||
self.alpha : float = alpha # biased random walk if no gradient
|
|
||||||
self.gamma : float = gamma # decay rate pheromone concentration fields
|
|
||||||
self.beta : float = beta # decay rate drop rate
|
|
||||||
self.d_s : float = d_s # decay rate sensitvity
|
|
||||||
self.d_e : float = d_e # decay rate energy (get's multiplied with resistance)
|
|
||||||
self.s_0 : float = s_0 # sensitvity reset
|
|
||||||
self.e_0 : float = e_0 # energy reset
|
|
||||||
self.q_0 : float = q_0 # pheromone drop rate reset
|
|
||||||
self.q_tr : float = q_tr # threshold under which ant cannot distinguish concentrations
|
|
||||||
self.e_min : float = e_min # energy at which walker dies
|
|
||||||
self.N_f : int = N_f #num food sources
|
|
||||||
self.successful_ants = 0 # for viviane's graph
|
|
||||||
self.connectivity = 0 # for viviane's persistence
|
|
||||||
self.dying_agents = 0
|
|
||||||
|
|
||||||
fields=["A", "B", "nests", "food", "res"]
|
|
||||||
self.schedule = SimultaneousActivation(self)
|
self.schedule = SimultaneousActivation(self)
|
||||||
self.grid = MultiHexGridScalarFields(width=width, height=height, torus=True, fields=fields)
|
self.grid = MultiHexGridScalarFields(width=width, height=height, torus=True, fields=fields)
|
||||||
|
|
||||||
if resistance_map_type is None:
|
|
||||||
self.grid.fields["res"] = np.ones((width, height)).astype(float)
|
|
||||||
elif resistance_map_type == "perlin":
|
|
||||||
# perlin generates anisotropic noise which may or may not be a good choice
|
|
||||||
# pip3 install git+https://github.com/pvigier/perlin-numpy
|
|
||||||
from perlin_numpy import (
|
|
||||||
generate_fractal_noise_2d,
|
|
||||||
generate_perlin_noise_2d,
|
|
||||||
)
|
|
||||||
noise = generate_perlin_noise_2d(shape=(width,height), res=((10,10)))
|
|
||||||
# normalized to mean=1, min=0, and max=2
|
|
||||||
normalized_noise = (noise - np.min(noise))/(np.max(noise) - np.min(noise)) * 2
|
|
||||||
self.grid.fields["res"] = normalized_noise
|
|
||||||
else:
|
|
||||||
# possible other noise types: simplex or value
|
|
||||||
raise NotImplemented(f"{resistance_map_type=} is not implemented.")
|
|
||||||
|
|
||||||
|
|
||||||
self._unique_id_counter = -1
|
self._unique_id_counter = -1
|
||||||
|
|
||||||
self.max_steps = max_steps
|
self.max_steps = max_steps
|
||||||
self.grid.add_nest(nest_position)
|
self.grid.add_nest(nest_position)
|
||||||
|
self.num_max_agents = num_max_agents
|
||||||
|
self.num_new_recruits = 5
|
||||||
|
|
||||||
|
self.decay_rates : dict[str, float] = {"A" :0.01,
|
||||||
|
"B": 0.01,
|
||||||
|
}
|
||||||
|
|
||||||
for agent_id in self.get_unique_ids(N_0):
|
self.dead_agents = []
|
||||||
if self.schedule.get_agent_count() < self.N_m:
|
|
||||||
|
for agent_id in self.get_unique_ids(num_initial_roamers):
|
||||||
|
if self.schedule.get_agent_count() < self.num_max_agents:
|
||||||
agent = RandomWalkerAnt(unique_id=agent_id, model=self, look_for_pheromone="A", drop_pheromone="A")
|
agent = RandomWalkerAnt(unique_id=agent_id, model=self, look_for_pheromone="A", drop_pheromone="A")
|
||||||
self.schedule.add(agent)
|
self.schedule.add(agent)
|
||||||
self.grid.place_agent(agent, pos=nest_position)
|
self.grid.place_agent(agent, pos=nest_position)
|
||||||
|
|
||||||
for _ in range(N_f):
|
for _ in range(num_food_sources):
|
||||||
self.grid.add_food(food_size)
|
self.grid.add_food(food_size)
|
||||||
|
|
||||||
self.datacollector = DataCollector(
|
self.datacollector = DataCollector(
|
||||||
# model_reporters={"agent_dens": lambda m: m.agent_density()},
|
model_reporters={},
|
||||||
model_reporters = {"pheromone_a": lambda m: m.grid.fields["A"],
|
|
||||||
"pheromone_b": lambda m: m.grid.fields["B"],
|
|
||||||
"alive_ants": lambda m: m.schedule.get_agent_count(),
|
|
||||||
"dying_ants": lambda m: m.dying_agents,
|
|
||||||
"successful_walkers": lambda m: m.successful_ants,
|
|
||||||
"connectivity": lambda m: m.connectivity,
|
|
||||||
},
|
|
||||||
agent_reporters={}
|
agent_reporters={}
|
||||||
)
|
)
|
||||||
self.datacollector.collect(self) # keep at end of __init___
|
self.datacollector.collect(self) # keep at end of __init___
|
||||||
|
|
||||||
# Breadth-first-search algorithm for connectivity
|
|
||||||
def bfs(self):
|
|
||||||
threshold = 0.5 # half of min sens
|
|
||||||
connectivity = 0 #initial value of connectivity
|
|
||||||
connected_food_sources = [] #empty list of connected food sources
|
|
||||||
visited = [] #empty list of visited (by the algorithm) nodes
|
|
||||||
|
|
||||||
nest = np.argwhere(self.grid.fields["nests"] == 1) #get nest location
|
|
||||||
nest = (nest[0][0], nest[0][1])
|
|
||||||
start_node = nest #rename
|
|
||||||
|
|
||||||
neighbours_to_check = [start_node] #start node gets checked first
|
|
||||||
neighbours_to_check = neighbours_to_check + self.grid.get_neighborhood(start_node) #start node neighbours get added to the to check list
|
|
||||||
|
|
||||||
while neighbours_to_check: #as long as there is something on the to check list
|
|
||||||
current_node = neighbours_to_check[0] #the first list entry is taken
|
|
||||||
del neighbours_to_check[0] #and deleted on the to check list
|
|
||||||
|
|
||||||
if current_node not in visited: #if it has not previously been checked
|
|
||||||
if np.max([self.grid.fields["B"][current_node], self.grid.fields["A"][current_node]]) >= threshold: #and its A value is above our threshold
|
|
||||||
new_neighbors = self.grid.get_neighborhood(current_node) #then we get its neighbours
|
|
||||||
for new_neighbor in new_neighbors:
|
|
||||||
if new_neighbor not in visited and new_neighbor not in neighbours_to_check:
|
|
||||||
neighbours_to_check.append(new_neighbor) #then they are also added to our to check list
|
|
||||||
visited.append(current_node) #and the current node has now been checked
|
|
||||||
|
|
||||||
if self.grid.fields["food"][current_node] > 0: #in case the node we check is food
|
|
||||||
connectivity += 1 #then we have found a connected path to a food source
|
|
||||||
connected_food_sources = connected_food_sources + list([current_node]) #and it is added to the list of connected food sources
|
|
||||||
|
|
||||||
# why not normalize to 0-1 ?
|
|
||||||
return connectivity #we want the connectivity (0-5)
|
|
||||||
|
|
||||||
def agent_density(self):
|
def agent_density(self):
|
||||||
a = np.zeros((self.grid.width, self.grid.height))
|
a = np.zeros((self.grid.width, self.grid.height))
|
||||||
for i in range(self.grid.width):
|
for i in range(self.grid.width):
|
||||||
@ -195,23 +65,26 @@ class ActiveWalkerModel(Model):
|
|||||||
|
|
||||||
|
|
||||||
def step(self):
|
def step(self):
|
||||||
self.dying_agents = 0
|
|
||||||
self.schedule.step() # step() and advance() all agents
|
self.schedule.step() # step() and advance() all agents
|
||||||
|
|
||||||
if self.schedule.steps % 100 == 0:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# apply decay rate on pheromone levels
|
# apply decay rate on pheromone levels
|
||||||
for key in ("A", "B"):
|
for key in ("A", "B"):
|
||||||
field = self.grid.fields[key]
|
field = self.grid.fields[key]
|
||||||
self.grid.fields[key] = field - self.gamma*field
|
self.grid.fields[key] = field - self.decay_rates[key]*field
|
||||||
|
|
||||||
|
|
||||||
self.datacollector.collect(self)
|
self.datacollector.collect(self)
|
||||||
|
|
||||||
if self.schedule.steps >= self.max_steps:
|
if self.schedule.steps >= self.max_steps:
|
||||||
self.running = False
|
self.running = False
|
||||||
|
|
||||||
|
#remove dead agents
|
||||||
|
for agent in self.dead_agents:
|
||||||
|
self.schedule.remove(agent)
|
||||||
|
self.grid.remove_agent(agent)
|
||||||
|
self.dead_agents.remove(agent)
|
||||||
|
self.dead_agents = []
|
||||||
|
#ToDo what happens when all agents die
|
||||||
|
|
||||||
def get_unique_id(self) -> int:
|
def get_unique_id(self) -> int:
|
||||||
self._unique_id_counter += 1
|
self._unique_id_counter += 1
|
||||||
return self._unique_id_counter
|
return self._unique_id_counter
|
||||||
@ -220,7 +93,6 @@ class ActiveWalkerModel(Model):
|
|||||||
for _ in range(num_ids):
|
for _ in range(num_ids):
|
||||||
yield self.get_unique_id()
|
yield self.get_unique_id()
|
||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
|
This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, version 3.
|
||||||
|
|
||||||
|
@ -107,8 +107,7 @@ class MultiHexGridScalarFields(MultiHexGrid):
|
|||||||
|
|
||||||
def is_food(self, pos):
|
def is_food(self, pos):
|
||||||
assert('food' in self.fields.keys())
|
assert('food' in self.fields.keys())
|
||||||
# account for potential float imprecision and use epsilon = 1e-3
|
return bool(self.fields['food'][pos])
|
||||||
return self.fields['food'][pos] > 1e-3
|
|
||||||
|
|
||||||
def add_food(self, size : int , pos=None):
|
def add_food(self, size : int , pos=None):
|
||||||
"""
|
"""
|
||||||
@ -128,7 +127,7 @@ class MultiHexGridScalarFields(MultiHexGrid):
|
|||||||
while(self.is_nest(pos) or self.is_food(pos)):
|
while(self.is_nest(pos) or self.is_food(pos)):
|
||||||
pos = select_random_place()
|
pos = select_random_place()
|
||||||
|
|
||||||
self.fields['food'][pos] = int(size)
|
self.fields['food'][pos] = size
|
||||||
|
|
||||||
def is_nest(self, pos : Coordinate) -> bool:
|
def is_nest(self, pos : Coordinate) -> bool:
|
||||||
assert('nests' in self.fields.keys())
|
assert('nests' in self.fields.keys())
|
||||||
|
71
plot.py
71
plot.py
@ -1,71 +0,0 @@
|
|||||||
#!/bin/python
|
|
||||||
|
|
||||||
import matplotlib.pyplot as plt
|
|
||||||
import numpy as np
|
|
||||||
from hexplot import plot_hexagon
|
|
||||||
|
|
||||||
def plot_alive_ants_vs_time(model, title=None):
|
|
||||||
y = model.datacollector.get_model_vars_dataframe()["alive_ants"]
|
|
||||||
plt.figure(figsize=(10,10), dpi=600)
|
|
||||||
plt.plot(y)
|
|
||||||
plt.xlabel("time step")
|
|
||||||
plt.ylabel("alive agents")
|
|
||||||
if title is None:
|
|
||||||
plt.savefig("alive_agents_over_time.eps")
|
|
||||||
else:
|
|
||||||
plt.savefig(f"{title}.png")
|
|
||||||
|
|
||||||
|
|
||||||
def plot_connectivity_vs_time(model, title=None):
|
|
||||||
y = model.datacollector.get_model_vars_dataframe()["connectivity"]
|
|
||||||
plt.figure(figsize=(10,10), dpi=600)
|
|
||||||
plt.plot(y)
|
|
||||||
plt.xlabel("time step")
|
|
||||||
plt.ylabel("No. of food sources connected to the nest")
|
|
||||||
if title is None:
|
|
||||||
plt.savefig("connectivity_over_time.eps")
|
|
||||||
else:
|
|
||||||
plt.savefig(f"{title}.png")
|
|
||||||
|
|
||||||
|
|
||||||
def dead_ants_vs_time(model, title=None):
|
|
||||||
y = np.cumsum(model.datacollector.get_model_vars_dataframe()["dying_ants"])
|
|
||||||
plt.figure(figsize=(10,10), dpi=600)
|
|
||||||
plt.plot(y)
|
|
||||||
plt.xlabel("time step")
|
|
||||||
plt.ylabel("dead agents")
|
|
||||||
if title is None:
|
|
||||||
plt.savefig("dead_agents_over_time.eps")
|
|
||||||
else:
|
|
||||||
plt.savefig(f"{title}.png")
|
|
||||||
|
|
||||||
|
|
||||||
def cum_successful_ants_vs_time(model, title=None):
|
|
||||||
y = model.datacollector.get_model_vars_dataframe()["successful_walkers"]
|
|
||||||
plt.figure(figsize=(10,10), dpi=600)
|
|
||||||
plt.plot(y)
|
|
||||||
plt.xlabel("time step")
|
|
||||||
plt.ylabel("cummulative successful agents")
|
|
||||||
if title is None:
|
|
||||||
plt.savefig("cumsum_successful_agents_over_time.eps")
|
|
||||||
else:
|
|
||||||
plt.savefig(f"{title}.png")
|
|
||||||
|
|
||||||
|
|
||||||
def plot_heatmap(model, low=10, high=200):
|
|
||||||
for time in np.arange(0, model.max_steps + 1, 1000):
|
|
||||||
pheromone_concentration = model.datacollector.get_model_vars_dataframe()["pheromone_a"][time]
|
|
||||||
a = pheromone_concentration
|
|
||||||
#plot_hexagon(a)
|
|
||||||
pheromone_concentration = model.datacollector.get_model_vars_dataframe()["pheromone_b"][time]
|
|
||||||
b = pheromone_concentration
|
|
||||||
#plot_hexagon(b)
|
|
||||||
c = np.max([a,b], axis=0)
|
|
||||||
c = a + b
|
|
||||||
c = np.clip(c, 1, 1000000000)
|
|
||||||
c = np.log(c)
|
|
||||||
c = c/np.max(c)
|
|
||||||
food_locations = np.nonzero(model.grid.fields['food'])
|
|
||||||
x_food = [ food[0] for food in food_locations ]
|
|
||||||
y_food = [ food[1] for food in food_locations ]
|
|
||||||
plot_hexagon(c, title=f"cummulative pheromone density at timestep {time}")
|
|
38
server.py
38
server.py
@ -22,12 +22,10 @@ def setup(params=None):
|
|||||||
# Set the model parameters
|
# Set the model parameters
|
||||||
if params is None:
|
if params is None:
|
||||||
params = {
|
params = {
|
||||||
"max_steps": 3000,
|
|
||||||
"width": 50, "height": 50,
|
"width": 50, "height": 50,
|
||||||
"N_m" : 100,
|
"num_max_agents" : 100,
|
||||||
"nest_position" : (25,25),
|
"nest_position" : (25,25),
|
||||||
"N_0" : 5,
|
"num_initial_roamers" : 5,
|
||||||
"resistance_map_type": "perlin",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -87,28 +85,12 @@ def setup(params=None):
|
|||||||
"Color": col,
|
"Color": col,
|
||||||
}
|
}
|
||||||
|
|
||||||
def portray_resistance_map(model, pos, norm=1):
|
|
||||||
col = get_color(level=model.grid.fields['res'][pos], normalization=norm)
|
|
||||||
col = f"rgb({col}, {col}, {col})"
|
|
||||||
return {
|
|
||||||
"Shape": "hex",
|
|
||||||
"r": 1,
|
|
||||||
"Filled": "true",
|
|
||||||
"Layer": 0,
|
|
||||||
"x": pos[0],
|
|
||||||
"y": pos[1],
|
|
||||||
"Color": col,
|
|
||||||
}
|
|
||||||
|
|
||||||
def get_max_grid_val(model, key):
|
def get_max_grid_val(model, key):
|
||||||
return np.max(model.grid.fields[key])
|
return np.max(model.grid.fields[key])
|
||||||
|
|
||||||
def portray_pheromone_density(model, pos, norm):
|
def portray_pheromone_density(model, pos, norm):
|
||||||
col_a = get_color(level=model.grid.fields["A"][pos], normalization=norm)
|
col_a = get_color(level=model.grid.fields["A"][pos], normalization=norm)
|
||||||
col_b = get_color(level=model.grid.fields["B"][pos], normalization=norm)
|
col_b = get_color(level=model.grid.fields["B"][pos], normalization=norm)
|
||||||
res_min, res_max = np.min(model.grid.fields['res']), np.max(model.grid.fields['res'])
|
|
||||||
ease = 1 - model.grid.fields['res'][pos]
|
|
||||||
col_ease = get_color(level=ease, normalization=np.max(model.grid.fields['res']))
|
|
||||||
return {
|
return {
|
||||||
"Shape": "hex",
|
"Shape": "hex",
|
||||||
"r": 1,
|
"r": 1,
|
||||||
@ -116,7 +98,7 @@ def setup(params=None):
|
|||||||
"Layer": 0,
|
"Layer": 0,
|
||||||
"x": pos[0],
|
"x": pos[0],
|
||||||
"y": pos[1],
|
"y": pos[1],
|
||||||
"Color": f"rgb({col_a}, {col_b}, {col_ease})"
|
"Color": f"rgb({col_a}, {col_b}, 255)"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -127,9 +109,6 @@ def setup(params=None):
|
|||||||
grid_ants = CanvasHexGridMultiAgents(portray_ant_density,
|
grid_ants = CanvasHexGridMultiAgents(portray_ant_density,
|
||||||
width, height, width*pixel_ratio, height*pixel_ratio,
|
width, height, width*pixel_ratio, height*pixel_ratio,
|
||||||
norm_method=lambda m: 5)
|
norm_method=lambda m: 5)
|
||||||
grid_resistance_map = CanvasHexGridMultiAgents(portray_resistance_map,
|
|
||||||
width, height, width*pixel_ratio, height*pixel_ratio,
|
|
||||||
norm_method=lambda m: 1)
|
|
||||||
|
|
||||||
def norm_ants(model):
|
def norm_ants(model):
|
||||||
return 5
|
return 5
|
||||||
@ -148,19 +127,14 @@ def setup(params=None):
|
|||||||
[lambda m: "<h3>Ant density</h3><h5>Nest: Red, Food: Green</h5>",
|
[lambda m: "<h3>Ant density</h3><h5>Nest: Red, Food: Green</h5>",
|
||||||
grid_ants,
|
grid_ants,
|
||||||
lambda m: f"<h5>Normalization Value: {norm_ants(m)}</h5>",
|
lambda m: f"<h5>Normalization Value: {norm_ants(m)}</h5>",
|
||||||
lambda m: "<h3>Pheromone Density</h3><h5>Pheromone A: Cyan, Pheromone B: Magenta, Resistance Map: Yellow</h5>",
|
lambda m: "<h3>Pheromone Density</h3><h5>Pheromone A: Cyan, Pheromone B: Magenta</h5>",
|
||||||
grid_pheromones,
|
grid_pheromones,
|
||||||
lambda m: f"<h5>Normalization Value: {norm_pheromones(m)}</h5>",
|
lambda m: f"<h5>Normalization Value: {norm_pheromones(m)}</h5>"
|
||||||
],
|
],
|
||||||
"Active Random Walker Ants", params)
|
"Active Random Walker Ants", params)
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
from model import kwargs_paper_setup1
|
server = setup()
|
||||||
kwargs_paper1_perlin = kwargs_paper_setup1
|
|
||||||
kwargs_paper1_perlin["height"] = 50
|
|
||||||
kwargs_paper1_perlin["width"] = 50
|
|
||||||
kwargs_paper1_perlin["resistance_map_type"] = "perlin"
|
|
||||||
server = setup(params=kwargs_paper1_perlin)
|
|
||||||
server.launch()
|
server.launch()
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
Loading…
Reference in New Issue
Block a user