-
Notifications
You must be signed in to change notification settings - Fork 40
Description
Dear Developers:
Hello, first I want to thank you for creating such a fantastic package! It has helped me immensely in my research! Recently, I've had an issue accessing the predecessor nodes in the network. I've created a scale-free network (directed) using a similar code as in the tutorial (https://agentpy.readthedocs.io/en/latest/agentpy_virus_spread.html); however, my network needs to be directed so I can access the predecessors as well as the successors of each node. However, I am having trouble accessing both as an error reads: "The node Person (Obj 9) is not in the digraph." when my code is for n in self.network.graph.predecessors(self) I've also attached my full code below. Thank you so much for your help and I apologize if this seems to be a dumb question!
Best,
Huizi
# Model design
import agentpy as ap
import networkx as nx
import random
import pandas as pd
import numpy as np
from scipy.stats import truncnorm
# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import IPython
class Person(ap.Agent):
def setup(self):
""" Initialize a new variable at agent creation. """
self.condition = 0 # Susceptible = 0, Infected = 1, Recovered = 2
def being_sick(self):
""" Spread disease to peers in the network. """
rng = self.model.random
for n in self.network.graph.predecessors(self):
if n.condition == 0 and self.p.infection_chance > rng.random():
n.condition = 1 # Infect susceptible peer
if self.p.recovery_chance > rng.random():
self.condition = 2 # Recover from infection
class VirusModel(ap.Model):
def setup(self):
""" Initialize the agents and network of the model. """
# Prepare a scale free network
graph = nx.scale_free_graph(
self.p.population)
# Create agents and network
self.agents = ap.AgentList(self, self.p.population, Person)
self.network = self.agents.network = ap.Network(self, graph)
self.network.add_agents(self.agents, self.network.nodes)
# Infect a random share of the population
I0 = int(self.p.initial_infection_share * self.p.population)
self.agents.random(I0).condition = 1
def update(self):
""" Record variables after setup and each step. """
# Record share of agents with each condition
for i, c in enumerate(('S', 'I', 'R')):
n_agents = len(self.agents.select(self.agents.condition == i))
self[c] = n_agents / self.p.population
self.record(c)
# Stop simulation if disease is gone
if self.I == 0:
self.stop()
def step(self):
""" Define the models' events per simulation step. """
# Call 'being_sick' for infected agents
self.agents.select(self.agents.condition == 1).being_sick()
def end(self):
""" Record evaluation measures at the end of the simulation. """
# Record final evaluation measures
self.report('Total share infected', self.I + self.R)
self.report('Peak share infected', max(self.log['I']))
parameters = {
'population': 1000,
'infection_chance': 0.3,
'recovery_chance': 0.1,
'initial_infection_share': 0.1,
'number_of_neighbors': 20,
'network_randomness': 0.5
#'steps':10
}
model = VirusModel(parameters)
results = model.run()