-
Notifications
You must be signed in to change notification settings - Fork 2
Description
Thanks again for your framework and the 1.6.0. update! I have just discovered it and will look into it later. I started using your framework last year before the 1.6.0 update and implemented several decorators, that I am attaching to the nodes, not the queues. I built a framework using your library for benchmarking. I noticed sometimes the last few items were not getting processed. The addition of nodes and edges is automized in my framework and upon building the graph from dependencies of nodes a set is used, the order in which these are added varies.
I built a simple example with my library and was able to reproduce the bug on the newest version 1.6.0. I recorded the add_node/add_edge calls for success and failure. I then built a simple example just with your library and again was able to reproduce the bug. In following you will find the script. Note: some things (such as packing the dict values for first_number and second_number into arrays in generate_numbers) don't make sense outside my framework, ignore that.
The following script features a simple calculation based on two numbers, where some noise is added to the first, the second is squared, the results of these operations get added up and finally printed to the console:
import random
from typing import List, Dict
from async_graph_data_flow import AsyncGraph, AsyncExecutor
rng = random.Random()
from collections import defaultdict
from asyncio import Lock
def multi_incoming_node(generator, incoming_nodes_count):
"""
Waits for a specific number of items with the same `index` from multiple nodes before calling the generator.
Args:
generator: The async function to wrap.
incoming_nodes_count: The number of items required for each `index`.
Returns:
A wrapped function.
"""
count_per_index = defaultdict(int)
lock = Lock() # Ensures safe access to the buffer across multiple tasks
async def wrapped(item):
print(f"Seeing item with id {item['id']} for the {count_per_index[item['id']] + 1} st/nd/th time!")
if (count_per_index[item["id"]] + 1) == incoming_nodes_count:
del count_per_index[item["id"]]
async for result in generator(item):
yield result
else:
async with lock:
count_per_index[item["id"]] += 1
return wrapped
async def generate_numbers():
numbers = [(1, 1), (5, 3), (2, 4), (9, 1)]
for idx, row in enumerate(numbers):
print(f"[generate_numbers] Yielding item with id {idx}")
yield {
"id": idx,
"first_number": [row[0]],
"second_number": [row[1]]
}
async def add_noise(item_stats: Dict[str, List]):
print(f"[add_noise] Processing item with id {item_stats['id']}")
first_numbers = item_stats["first_number"]
noise = [rng.gauss(0.0, 1.0) for _ in range(len(first_numbers))]
noisy_first_number = [x + n for x, n in zip(first_numbers, noise)]
item_stats["noisy_first_number"] = noisy_first_number
yield item_stats
async def square_second(item_stats: Dict[str, list]) -> Dict[str, List]:
print(f"[square_second] Processing item with id {item_stats['id']}")
second_numbers = item_stats["second_number"]
second_number_squared = [pow(number, 2) for number in second_numbers]
item_stats["second_number_squared"] = second_number_squared
yield item_stats
async def add(item_stats: Dict[str, list]) -> Dict[str, List]:
print(f"[add] Processing item with id {item_stats['id']}")
first_summand = item_stats["noisy_first_number"]
second_summand = item_stats["second_number_squared"]
addition = [a + b for a, b in zip(first_summand, second_summand)]
item_stats["addition"] = addition
yield item_stats
async def log_result(item_stats: Dict[str, list]) -> List:
print(f"[log_result] Processing item with id {item_stats['id']}")
addition = item_stats["addition"]
print("addition result:", addition)
yield item_stats
if __name__ == "__main__":
graph = AsyncGraph()
multi_incoming_add = multi_incoming_node(add, 2)
# This works: the items get successfully processed by all nodes, including log_result
# graph.add_node(generate_numbers, unpack_input=False, )
# graph.add_node(add_noise, unpack_input=False, )
# graph.add_node(square_second, unpack_input=False, )
# graph.add_node(multi_incoming_add, unpack_input=False, )
# graph.add_node(log_result, unpack_input=False, )
#
# graph.add_edge(generate_numbers, add_noise)
# graph.add_edge(generate_numbers, square_second)
# graph.add_edge(add_noise, multi_incoming_add)
# graph.add_edge(square_second, multi_incoming_add)
# graph.add_edge(multi_incoming_add, log_result)
# This doesn't work: although only the order of add_node/add_edge calls is adjusted, this will not process all nodes, specifically log_result
graph.add_node(log_result, unpack_input=False, )
graph.add_node(generate_numbers, unpack_input=False, )
graph.add_node(square_second, unpack_input=False, )
graph.add_node(add_noise, unpack_input=False, )
graph.add_node(multi_incoming_add, unpack_input=False, )
graph.add_edge(generate_numbers, add_noise)
graph.add_edge(generate_numbers, square_second)
graph.add_edge(square_second, multi_incoming_add)
graph.add_edge(add_noise, multi_incoming_add)
graph.add_edge(multi_incoming_add, log_result)
AsyncExecutor(graph).execute()I suppose this is not intended. I assume that there is a loop over the queues somewhere, checking if there are items inside that still need to be processed. If the queues are empty, the execution finishes. However, the later queues will push items to the "previous" queues in the array of queues already looped over and marked finished and this will result in them not being processed. This however, is just an educated guess.
Any help would be highly appreciated! Thanks in advance!