Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,11 @@ async def stop_runtime() -> None:
cancellation_token.link_future(message_future)
# Wait for the next message, this will raise an exception if the task is cancelled.
message = await message_future
# Check if cancellation token was cancelled before processing/yielding the message.
# This ensures that signal handlers (e.g., SIGINT/Ctrl+C) can stop iteration immediately
# even if there are buffered messages in the queue.
if cancellation_token is not None and cancellation_token.is_cancelled():
break
if isinstance(message, GroupChatTermination):
# If the message contains an error, we need to raise it here.
# This will stop the team and propagate the error.
Expand Down
60 changes: 60 additions & 0 deletions python/packages/autogen-agentchat/tests/test_group_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,66 @@ async def test_round_robin_group_chat_cancellation(runtime: AgentRuntime | None)
assert result.stop_reason is not None and result.stop_reason == "Maximum number of turns 1000 reached."


@pytest.mark.asyncio
async def test_round_robin_group_chat_run_stream_cancellation(runtime: AgentRuntime | None) -> None:
"""Test that run_stream() stops immediately when CancellationToken is cancelled.

This test verifies the fix for issue #7100 where run_stream() would continue
processing buffered messages even after the cancellation token was cancelled.
"""
agent_1 = _EchoAgent("agent_1", description="echo agent 1")
agent_2 = _EchoAgent("agent_2", description="echo agent 2")
agent_3 = _EchoAgent("agent_3", description="echo agent 3")
agent_4 = _EchoAgent("agent_4", description="echo agent 4")
# Set max_turns to a large number to avoid stopping due to max_turns before cancellation.
team = RoundRobinGroupChat(participants=[agent_1, agent_2, agent_3, agent_4], max_turns=1000, runtime=runtime)
cancellation_token = CancellationToken()

events_processed = []
cancellation_occurred = False

async def stream_and_cancel() -> None:
nonlocal cancellation_occurred, events_processed
try:
async for event in team.run_stream(
task="Write a program that prints 'Hello, world!'",
cancellation_token=cancellation_token,
):
events_processed.append(type(event).__name__)
# Cancel after processing a few events to test immediate stopping
if len(events_processed) == 3 and not cancellation_occurred:
cancellation_token.cancel()
cancellation_occurred = True
except asyncio.CancelledError:
# CancelledError is expected when cancellation token is cancelled
# This is the correct behavior - the stream should stop
pass

# Run the stream task
stream_task = asyncio.create_task(stream_and_cancel())

# Wait a bit for events to process and cancellation to occur
await asyncio.sleep(0.5)

# Verify that cancellation occurred
assert cancellation_occurred, "Cancellation should have occurred"

# Wait for the task to complete (it should have stopped due to cancellation)
try:
await asyncio.wait_for(stream_task, timeout=2.0)
except asyncio.TimeoutError:
pytest.fail("Stream task did not complete after cancellation - this indicates the bug still exists")

# With the fix, we should stop immediately after cancellation
# Without the fix, all events would be processed (could be 10+ events)
# With the fix, we should have processed only a few events before stopping
assert len(events_processed) <= 5, (
f"Stream should stop soon after cancellation. "
f"Processed {len(events_processed)} events, expected <= 5. "
f"This indicates the cancellation check is not working properly."
)


@pytest.mark.asyncio
async def test_selector_group_chat(runtime: AgentRuntime | None) -> None:
model_client = ReplayChatCompletionClient(
Expand Down