-
Notifications
You must be signed in to change notification settings - Fork 7.9k
Description
CancellationToken doesn't stop run_stream() iteration when signal handler cancels token
Summary
When using run_stream() with a CancellationToken that gets cancelled by a signal handler (e.g., SIGINT/Ctrl+C), the async iteration loop continues processing buffered messages instead of stopping immediately.
Environment
- AutoGen Version: 0.7.5
- Python Version: 3.13.9
- OS: macOS Darwin 25.0.0
Expected Behavior
After cancelling the CancellationToken (via signal handler or programmatically), the run_stream() async iteration should stop immediately.
Actual Behavior
The loop continues processing all buffered messages before checking cancellation status. The application appears to hang until all messages are drained from the queue.
Root Cause
In autogen_agentchat/teams/_group_chat/_base_group_chat.py, the run_stream() implementation has a while True: loop that never checks cancellation_token.is_cancelled() before yielding:
while True: # ← Never checks token status
message_future = asyncio.ensure_future(self._output_message_queue.get())
if cancellation_token is not None:
cancellation_token.link_future(message_future)
# Wait for the next message, this will raise an exception if the task is cancelled.
message = await message_future # ← Exception only raised HERE
# ... process message ...
yield message # ← Messages yielded even if token was just cancelledProblem: If the queue has buffered messages, they get yielded before the next await message_future can raise the cancellation exception.
Reproduction
import asyncio
import signal
import sys
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.conditions import MaxMessageTermination
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_core import CancellationToken
from autogen_ext.models.openai import OpenAIChatCompletionClient
async def main():
model_client = OpenAIChatCompletionClient(model="gpt-4o-mini")
agent = AssistantAgent("Assistant", model_client=model_client)
team = RoundRobinGroupChat([agent], termination_condition=MaxMessageTermination(10))
token = CancellationToken()
def signal_handler(signum, frame):
print("Cancelling token...")
token.cancel()
# Without raising KeyboardInterrupt, loop continues
signal.signal(signal.SIGINT, signal_handler)
print("Press Ctrl+C during streaming...")
event_count = 0
async for event in team.run_stream(task="Count 1 to 20", cancellation_token=token):
event_count += 1
print(f"Event {event_count}")
print(f"Processed {event_count} events") # Processes ALL events despite Ctrl+C
asyncio.run(main())Result: After pressing Ctrl+C, all remaining events are still processed instead of stopping immediately.
Workaround
Raise KeyboardInterrupt in the signal handler to forcefully break the async iteration loop:
def signal_handler(signum, frame):
token.cancel()
raise KeyboardInterrupt() # Force breakSide effect: This causes asyncio.CancelledError and ValueError: task_done() called too many times during cleanup (see issue #7007).
Suggested Fix
Check cancellation status before yielding in the loop:
while True:
message_future = asyncio.ensure_future(self._output_message_queue.get())
if cancellation_token is not None:
cancellation_token.link_future(message_future)
message = await message_future
# Add this check:
if cancellation_token is not None and cancellation_token.is_cancelled():
break
if isinstance(message, GroupChatTermination):
# ... existing code ...
break
yield messageRelated Issues
- Respect Cancellation Token in AgentChat
runandrun_stream, and add example to show how to use it #4029 - Claimed to add cancellation token support, but only tested async cancellation (not SIGINT) - ValueError: task_done() called too many times #7007 - Reports
task_done() called too many timeserror (symptom of cancellation bugs) - SequentialRoutedAgent should handle cancellation of current task as an event handler #4776 - Mentions
SequentialRoutedAgentshould handle cancellation as event handler
Additional Context
The issue is particularly problematic for interactive CLI applications where users expect Ctrl+C to stop execution immediately. The current behavior makes applications appear unresponsive to user interruption.
Your own documentation acknowledges this limitation:
"Setting the cancellation token potentially put the team in an inconsistent state"
A proper cancellation mechanism should gracefully stop iteration without requiring KeyboardInterrupt.