Skip to content

Commit a371461

Browse files
ovidiutaralescataralesc
andauthored
fix: make event_consumer tolerant to closed queues on py3.13 (#407)
# Issue On Python 3.13, the closed-queue signal is `asyncio.QueueShutDown`. `event_consumer.py` aliased `asyncio.QueueShutDown` to `ClosedQueue`, but `asyncio.QueueEmpty` can still arise on py3.13 and it's not handled properly in `consume_all()`'s except blocks. # How it's reproduced In any `AgentExecutor` class: ``` # (...) async def execute( self, request: RequestContext, event_queue: EventQueue, ) -> None: await event_queue.enqueue_event( new_task(request.message) ) ``` In python < 3.13: - Sending a `message/send` request works, a task is added to the `InMemoryTaskStore`. In python >= 3.13: - Sending a `message/send` request crashes with exception: ``` File "venv/lib/python3.13/site-packages/a2a/server/apps/jsonrpc/jsonrpc_app.py", line 219, in _handle_requests return await self._process_non_streaming_request( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ request_id, a2a_request, call_context ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ) ^ File "venv/lib/python3.13/site-packages/a2a/server/apps/jsonrpc/jsonrpc_app.py", line 306, in _process_non_streaming_request handler_result = await self.handler.on_message_send( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ request_obj, context ^^^^^^^^^^^^^^^^^^^^ ) ^ File "venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper result = await func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "venv/lib/python3.13/site-packages/a2a/server/request_handlers/jsonrpc_handler.py", line 87, in on_message_send task_or_message = await self.request_handler.on_message_send( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ request.params, context ^^^^^^^^^^^^^^^^^^^^^^^ ) ^ File "venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper result = await func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "venv/lib/python3.13/site-packages/a2a/server/request_handlers/default_request_handler.py", line 282, in on_message_send ) = await result_aggregator.consume_and_break_on_interrupt(consumer) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "venv/lib/python3.13/site-packages/a2a/server/tasks/result_aggregator.py", line 115, in consume_and_break_on_interrupt async for event in event_stream: ...<20 lines>... break File "venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py", line 87, in consume_all raise self._exception File "venv/lib/python3.13/site-packages/a2a/server/events/event_consumer.py", line 94, in consume_all event = await asyncio.wait_for( ^^^^^^^^^^^^^^^^^^^^^^^ self.queue.dequeue_event(), timeout=self._timeout ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ) ^ File "/opt/homebrew/Cellar/[email protected]/3.13.5/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/tasks.py", line 507, in wait_for return await fut ^^^^^^^^^ File "venv/lib/python3.13/site-packages/a2a/utils/telemetry.py", line 162, in async_wrapper result = await func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "venv/lib/python3.13/site-packages/a2a/server/events/event_queue.py", line 95, in dequeue_event raise asyncio.QueueEmpty('Queue is closed.') asyncio.queues.QueueEmpty: Queue is closed. ``` # Fix ## Code - `event_consumer.consume_all`: - Catch `(QueueClosed, asyncio.QueueEmpty)` and break only when `queue.is_closed()` is True; otherwise continue polling. - `event_queue.dequeue_event`: - Version-guard the early-raise: on <3.13 keep raising `QueueEmpty` when closed+empty; on ≥3.13 skip the early raise and rely on `queue.shutdown()` to surface `QueueShutDown` exceptions. ## Tests Added 2 tests which fail on current implementation, but pass after the fix. --------- Co-authored-by: taralesc <[email protected]>
1 parent da14cea commit a371461

File tree

4 files changed

+69
-8
lines changed

4 files changed

+69
-8
lines changed

src/a2a/server/events/event_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
135135
except asyncio.TimeoutError: # pyright: ignore [reportUnusedExcept]
136136
# This class was made an alias of build-in TimeoutError after 3.11
137137
continue
138-
except QueueClosed:
138+
except (QueueClosed, asyncio.QueueEmpty):
139139
# Confirm that the queue is closed, e.g. we aren't on
140140
# python 3.12 and get a queue empty error on an open queue
141141
if self.queue.is_closed():

src/a2a/server/events/event_queue.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,12 @@ async def dequeue_event(self, no_wait: bool = False) -> Event:
9090
asyncio.QueueShutDown: If the queue has been closed and is empty.
9191
"""
9292
async with self._lock:
93-
if self._is_closed and self.queue.empty():
93+
if (
94+
sys.version_info < (3, 13)
95+
and self._is_closed
96+
and self.queue.empty()
97+
):
98+
# On 3.13+, skip early raise; await self.queue.get() will raise QueueShutDown after shutdown()
9499
logger.warning('Queue is closed. Event will not be dequeued.')
95100
raise asyncio.QueueEmpty('Queue is closed.')
96101

tests/server/events/test_event_consumer.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,59 @@ async def test_consume_all_continues_on_queue_empty_if_not_really_closed(
324324
assert mock_event_queue.is_closed.call_count == 1
325325

326326

327+
@pytest.mark.asyncio
328+
async def test_consume_all_handles_queue_empty_when_closed_python_version_agnostic(
329+
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
330+
):
331+
"""Ensure consume_all stops with no events when queue is closed and dequeue_event raises asyncio.QueueEmpty (Python version-agnostic)."""
332+
# Make QueueClosed a distinct exception (not QueueEmpty) to emulate py3.13 semantics
333+
from a2a.server.events import event_consumer as ec
334+
335+
class QueueShutDown(Exception):
336+
pass
337+
338+
monkeypatch.setattr(ec, 'QueueClosed', QueueShutDown, raising=True)
339+
340+
# Simulate queue reporting closed while dequeue raises QueueEmpty
341+
mock_event_queue.dequeue_event.side_effect = asyncio.QueueEmpty(
342+
'closed/empty'
343+
)
344+
mock_event_queue.is_closed.return_value = True
345+
346+
consumed_events = []
347+
async for event in event_consumer.consume_all():
348+
consumed_events.append(event)
349+
350+
assert consumed_events == []
351+
mock_event_queue.dequeue_event.assert_called_once()
352+
mock_event_queue.is_closed.assert_called_once()
353+
354+
355+
@pytest.mark.asyncio
356+
async def test_consume_all_continues_on_queue_empty_when_not_closed(
357+
event_consumer: EventConsumer, mock_event_queue: AsyncMock, monkeypatch
358+
):
359+
"""Ensure consume_all continues after asyncio.QueueEmpty when queue is open, yielding the next (final) event."""
360+
# First dequeue raises QueueEmpty (transient empty), then a final Message arrives
361+
final = Message(role='agent', parts=[{'text': 'done'}], message_id='final')
362+
mock_event_queue.dequeue_event.side_effect = [
363+
asyncio.QueueEmpty('temporarily empty'),
364+
final,
365+
]
366+
mock_event_queue.is_closed.return_value = False
367+
368+
# Make the polling responsive in tests
369+
event_consumer._timeout = 0.001
370+
371+
consumed = []
372+
async for ev in event_consumer.consume_all():
373+
consumed.append(ev)
374+
375+
assert consumed == [final]
376+
assert mock_event_queue.dequeue_event.call_count == 2
377+
mock_event_queue.is_closed.assert_called_once()
378+
379+
327380
def test_agent_task_callback_sets_exception(event_consumer: EventConsumer):
328381
"""Test that agent_task_callback sets _exception if the task had one."""
329382
mock_task = MagicMock(spec=asyncio.Task)

tests/server/events/test_event_queue.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,9 @@ async def test_enqueue_event_propagates_to_children(
169169

170170

171171
@pytest.mark.asyncio
172-
async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None:
172+
async def test_enqueue_event_when_closed(
173+
event_queue: EventQueue, expected_queue_closed_exception
174+
) -> None:
173175
"""Test that no event is enqueued if the parent queue is closed."""
174176
await event_queue.close() # Close the queue first
175177

@@ -178,7 +180,7 @@ async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None:
178180
await event_queue.enqueue_event(event)
179181

180182
# Verify the queue is still empty
181-
with pytest.raises(asyncio.QueueEmpty):
183+
with pytest.raises(expected_queue_closed_exception):
182184
await event_queue.dequeue_event(no_wait=True)
183185

184186
# Also verify child queues are not affected directly by parent's enqueue attempt when closed
@@ -192,7 +194,7 @@ async def test_enqueue_event_when_closed(event_queue: EventQueue) -> None:
192194
await (
193195
child_queue.close()
194196
) # ensure child is also seen as closed for this test's purpose
195-
with pytest.raises(asyncio.QueueEmpty):
197+
with pytest.raises(expected_queue_closed_exception):
196198
await child_queue.dequeue_event(no_wait=True)
197199

198200

@@ -214,7 +216,7 @@ async def test_dequeue_event_closed_and_empty_no_wait(
214216
with pytest.raises(expected_queue_closed_exception):
215217
event_queue.queue.get_nowait()
216218

217-
with pytest.raises(asyncio.QueueEmpty, match='Queue is closed.'):
219+
with pytest.raises(expected_queue_closed_exception):
218220
await event_queue.dequeue_event(no_wait=True)
219221

220222

@@ -230,7 +232,8 @@ async def test_dequeue_event_closed_and_empty_waits_then_raises(
230232

231233
# This test is tricky because await event_queue.dequeue_event() would hang if not for the close check.
232234
# The current implementation's dequeue_event checks `is_closed` first.
233-
# If closed and empty, it raises QueueEmpty immediately.
235+
# If closed and empty, it raises QueueEmpty immediately (on Python <= 3.12).
236+
# On Python 3.13+, this check is skipped and asyncio.Queue.get() raises QueueShutDown instead.
234237
# The "waits_then_raises" scenario described in the subtask implies the `get()` might wait.
235238
# However, the current code:
236239
# async with self._lock:
@@ -240,7 +243,7 @@ async def test_dequeue_event_closed_and_empty_waits_then_raises(
240243
# event = await self.queue.get() -> this line is not reached if closed and empty.
241244

242245
# So, for the current implementation, it will raise QueueEmpty immediately.
243-
with pytest.raises(asyncio.QueueEmpty, match='Queue is closed.'):
246+
with pytest.raises(expected_queue_closed_exception):
244247
await event_queue.dequeue_event(no_wait=False)
245248

246249
# If the implementation were to change to allow `await self.queue.get()`

0 commit comments

Comments
 (0)