Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 46 additions & 6 deletions autogpt_platform/backend/backend/data/execution.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import logging
import queue
from collections import defaultdict
from datetime import datetime, timedelta, timezone
from enum import Enum
from multiprocessing import Manager
from queue import Empty
from typing import (
TYPE_CHECKING,
Annotated,
Expand Down Expand Up @@ -1164,27 +1163,68 @@ class NodeExecutionEntry(BaseModel):

class ExecutionQueue(Generic[T]):
"""
Queue for managing the execution of agents.
This will be shared between different processes
Thread-safe queue for managing node execution within a single graph execution.

BUG FIX EXPLANATION:
====================
This class previously used `multiprocessing.Manager().Queue()` which spawns a
separate subprocess for inter-process communication (IPC). However, analysis of
the codebase revealed that ExecutionQueue is:

1. Created per-graph-execution in `_on_graph_execution()` method
2. Only accessed from threads within the SAME process:
- Main execution thread (adds/gets items, checks if empty)
- Async coroutines in `node_evaluation_loop` thread (adds items via
`_process_node_output`)

Since all access is within a single process, we only need THREAD-SAFETY, not
PROCESS-SAFETY. Using multiprocessing.Manager() was:

- Spawning an unnecessary subprocess for each graph execution
- Adding significant IPC overhead for every queue operation
- Potentially causing resource leaks if Manager processes weren't properly
cleaned up
- Limiting scalability when many graphs execute concurrently

THE FIX:
========
Replace `multiprocessing.Manager().Queue()` with `queue.Queue()` which is:
- Thread-safe (uses internal locks for synchronization)
- Much faster (no IPC overhead)
- No subprocess spawning
- Proper cleanup through Python's garbage collector

This is a minimal, high-impact fix that improves performance and resource usage
without changing any external API or behavior.
"""

def __init__(self):
self.queue = Manager().Queue()
# Use threading-safe queue instead of multiprocessing Manager queue.
# queue.Queue is thread-safe and sufficient since ExecutionQueue is only
# accessed from multiple threads within the same process, not across processes.
self.queue: queue.Queue[T] = queue.Queue()

def add(self, execution: T) -> T:
"""Add an execution entry to the queue. Thread-safe."""
self.queue.put(execution)
return execution

def get(self) -> T:
"""Get the next execution entry from the queue. Blocks if empty. Thread-safe."""
return self.queue.get()

def empty(self) -> bool:
"""Check if the queue is empty. Thread-safe (approximate check)."""
return self.queue.empty()

def get_or_none(self) -> T | None:
"""
Non-blocking get: returns the next item or None if queue is empty.
Thread-safe.
"""
try:
return self.queue.get_nowait()
except Empty:
except queue.Empty:
return None


Expand Down
241 changes: 241 additions & 0 deletions autogpt_platform/backend/test_execution_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
"""
Test script to verify the ExecutionQueue fix in execution.py

This script tests:
1. That ExecutionQueue uses queue.Queue (not multiprocessing.Manager().Queue())
2. All queue operations work correctly
3. Thread-safety works as expected
"""

import sys
import threading
import time

sys.path.insert(0, ".")

import queue


def test_queue_type():
"""Test that ExecutionQueue uses the correct queue type."""
from backend.data.execution import ExecutionQueue

q = ExecutionQueue()

# Verify it's using queue.Queue, not multiprocessing queue
assert isinstance(
q.queue, queue.Queue
), f"FAIL: Expected queue.Queue, got {type(q.queue)}"
print("✓ ExecutionQueue uses queue.Queue (not multiprocessing.Manager().Queue())")


def test_basic_operations():
"""Test basic queue operations."""
from backend.data.execution import ExecutionQueue

q = ExecutionQueue()

# Test add
result = q.add("item1")
assert result == "item1", f"FAIL: add() should return the item, got {result}"
print("✓ add() works correctly")

# Test empty() when not empty
assert q.empty() is False, "FAIL: empty() should return False when queue has items"
print("✓ empty() returns False when queue has items")

# Test get()
item = q.get()
assert item == "item1", f"FAIL: get() returned {item}, expected 'item1'"
print("✓ get() works correctly")

# Test empty() when empty
assert q.empty() is True, "FAIL: empty() should return True when queue is empty"
print("✓ empty() returns True when queue is empty")

# Test get_or_none() when empty
result = q.get_or_none()
assert result is None, f"FAIL: get_or_none() should return None, got {result}"
print("✓ get_or_none() returns None when queue is empty")

# Test get_or_none() with items
q.add("item2")
result = q.get_or_none()
assert result == "item2", f"FAIL: get_or_none() returned {result}, expected 'item2'"
print("✓ get_or_none() returns item when queue has items")


def test_thread_safety():
"""Test that the queue is thread-safe."""
from backend.data.execution import ExecutionQueue

q = ExecutionQueue()
results = []
errors = []
num_items = 100

def producer():
try:
for i in range(num_items):
q.add(f"item_{i}")
except Exception as e:
errors.append(f"Producer error: {e}")

def consumer():
try:
count = 0
while count < num_items:
item = q.get_or_none()
if item is not None:
results.append(item)
count += 1
else:
time.sleep(0.001) # Small delay to avoid busy waiting
except Exception as e:
errors.append(f"Consumer error: {e}")

# Start threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

producer_thread.start()
consumer_thread.start()

producer_thread.join(timeout=5)
consumer_thread.join(timeout=5)

assert len(errors) == 0, f"FAIL: Thread errors occurred: {errors}"
assert (
len(results) == num_items
), f"FAIL: Expected {num_items} items, got {len(results)}"
print(
f"✓ Thread-safety test passed ({num_items} items transferred between threads)"
)


def test_multiple_producers_consumers():
"""Test with multiple producer and consumer threads."""
from backend.data.execution import ExecutionQueue

q = ExecutionQueue()
results = []
results_lock = threading.Lock()
errors = []
items_per_producer = 50
num_producers = 3
total_items = items_per_producer * num_producers

def producer(producer_id):
try:
for i in range(items_per_producer):
q.add(f"producer_{producer_id}_item_{i}")
except Exception as e:
errors.append(f"Producer {producer_id} error: {e}")

def consumer(consumer_id, target_count):
try:
count = 0
max_attempts = target_count * 100
attempts = 0
while count < target_count and attempts < max_attempts:
item = q.get_or_none()
if item is not None:
with results_lock:
results.append(item)
count += 1
else:
time.sleep(0.001)
attempts += 1
except Exception as e:
errors.append(f"Consumer {consumer_id} error: {e}")

# Start multiple producers
producer_threads = [
threading.Thread(target=producer, args=(i,)) for i in range(num_producers)
]

# Start multiple consumers (each consumes half of total)
consumer_threads = [
threading.Thread(target=consumer, args=(i, total_items // 2)) for i in range(2)
]

for t in producer_threads:
t.start()
for t in consumer_threads:
t.start()

for t in producer_threads:
t.join(timeout=10)
for t in consumer_threads:
t.join(timeout=10)

assert len(errors) == 0, f"FAIL: Thread errors occurred: {errors}"
assert (
len(results) == total_items
), f"FAIL: Expected {total_items} items, got {len(results)}"
print(
f"✓ Multi-producer/consumer test passed ({num_producers} producers, 2 consumers, {total_items} items)"
)


def test_no_subprocess_spawned():
"""Verify that no subprocess is spawned (unlike multiprocessing.Manager())."""
from backend.data.execution import ExecutionQueue

# Create multiple queues (this would spawn subprocesses with Manager())
queues = [ExecutionQueue() for _ in range(5)]

# If we got here without issues, no subprocesses were spawned
# With Manager().Queue(), creating 5 queues would spawn 5 manager processes
for q in queues:
q.add("test")
assert q.get() == "test"

print(
"✓ No subprocess spawning (5 queues created without spawning manager processes)"
)


def main():
print("=" * 60)
print("ExecutionQueue Fix Verification Tests")
print("=" * 60)
print()

tests = [
("Queue Type Check", test_queue_type),
("Basic Operations", test_basic_operations),
("Thread Safety", test_thread_safety),
("Multiple Producers/Consumers", test_multiple_producers_consumers),
("No Subprocess Spawning", test_no_subprocess_spawned),
]

passed = 0
failed = 0

for name, test_func in tests:
print(f"\n--- {name} ---")
try:
test_func()
passed += 1
except AssertionError as e:
print(f"✗ {e}")
failed += 1
except Exception as e:
print(f"✗ Unexpected error: {e}")
failed += 1

print()
print("=" * 60)
if failed == 0:
print(f"✅ ALL TESTS PASSED ({passed}/{passed})")
print("The ExecutionQueue fix is working correctly!")
else:
print(f"❌ TESTS FAILED: {failed} failed, {passed} passed")
print("=" * 60)

return 0 if failed == 0 else 1


if __name__ == "__main__":
sys.exit(main())
Loading