Skip to content

Commit c147a83

Browse files
meoow113严骏驰
andauthored
fix: Client hangs when implementing AgentExecutor and awaiting twice in execute method (#379)
Fixes #367 - Add clear_events method for immediate queue cleanup - Integrate with event_consumer to prevent hanging - Support child queue clearing for complete cleanup --------- Co-authored-by: 严骏驰 <[email protected]>
1 parent db82a65 commit c147a83

File tree

3 files changed

+199
-9
lines changed

3 files changed

+199
-9
lines changed

src/a2a/server/events/event_consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ async def consume_all(self) -> AsyncGenerator[Event]:
125125
# other part is waiting for an event or a closed queue.
126126
if is_final_event:
127127
logger.debug('Stopping event consumption in consume_all.')
128-
await self.queue.close()
128+
await self.queue.close(True)
129129
yield event
130130
break
131131
yield event

src/a2a/server/events/event_queue.py

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -132,25 +132,38 @@ def tap(self) -> 'EventQueue':
132132
self._children.append(queue)
133133
return queue
134134

135-
async def close(self) -> None:
136-
"""Closes the queue for future push events.
135+
async def close(self, immediate: bool = False) -> None:
136+
"""Closes the queue for future push events and also closes all child queues.
137+
138+
Once closed, no new events can be enqueued. For Python 3.13+, this will trigger
139+
`asyncio.QueueShutDown` when the queue is empty and a consumer tries to dequeue.
140+
For lower versions, the queue will be marked as closed and optionally cleared.
141+
142+
Args:
143+
immediate (bool):
144+
- True: Immediately closes the queue and clears all unprocessed events without waiting for them to be consumed. This is suitable for scenarios where you need to forcefully interrupt and quickly release resources.
145+
- False (default): Gracefully closes the queue, waiting for all queued events to be processed (i.e., the queue is drained) before closing. This is suitable when you want to ensure all events are handled.
137146
138-
Once closed, `dequeue_event` will eventually raise `asyncio.QueueShutDown`
139-
when the queue is empty. Also closes all child queues.
140147
"""
141148
logger.debug('Closing EventQueue.')
142149
async with self._lock:
143150
# If already closed, just return.
144-
if self._is_closed:
151+
if self._is_closed and not immediate:
145152
return
146-
self._is_closed = True
153+
if not self._is_closed:
154+
self._is_closed = True
147155
# If using python 3.13 or higher, use the shutdown method
148156
if sys.version_info >= (3, 13):
149-
self.queue.shutdown()
157+
self.queue.shutdown(immediate)
150158
for child in self._children:
151-
await child.close()
159+
await child.close(immediate)
152160
# Otherwise, join the queue
153161
else:
162+
if immediate:
163+
await self.clear_events(True)
164+
for child in self._children:
165+
await child.close(immediate)
166+
return
154167
tasks = [asyncio.create_task(self.queue.join())]
155168
tasks.extend(
156169
asyncio.create_task(child.close()) for child in self._children
@@ -160,3 +173,53 @@ async def close(self) -> None:
160173
def is_closed(self) -> bool:
161174
"""Checks if the queue is closed."""
162175
return self._is_closed
176+
177+
async def clear_events(self, clear_child_queues: bool = True) -> None:
178+
"""Clears all events from the current queue and optionally all child queues.
179+
180+
This method removes all pending events from the queue without processing them.
181+
Child queues can be optionally cleared based on the clear_child_queues parameter.
182+
183+
Args:
184+
clear_child_queues: If True (default), clear all child queues as well.
185+
If False, only clear the current queue, leaving child queues untouched.
186+
"""
187+
logger.debug('Clearing all events from EventQueue and child queues.')
188+
189+
# Clear all events from the queue, even if closed
190+
cleared_count = 0
191+
async with self._lock:
192+
try:
193+
while True:
194+
event = self.queue.get_nowait()
195+
logger.debug(
196+
f'Discarding unprocessed event of type: {type(event)}, content: {event}'
197+
)
198+
self.queue.task_done()
199+
cleared_count += 1
200+
except asyncio.QueueEmpty:
201+
pass
202+
except Exception as e:
203+
# Handle Python 3.13+ QueueShutDown
204+
if (
205+
sys.version_info >= (3, 13)
206+
and type(e).__name__ == 'QueueShutDown'
207+
):
208+
pass
209+
else:
210+
raise
211+
212+
if cleared_count > 0:
213+
logger.debug(
214+
f'Cleared {cleared_count} unprocessed events from EventQueue.'
215+
)
216+
217+
# Clear all child queues (lock released before awaiting child tasks)
218+
if clear_child_queues and self._children:
219+
child_tasks = [
220+
asyncio.create_task(child.clear_events())
221+
for child in self._children
222+
]
223+
224+
if child_tasks:
225+
await asyncio.gather(*child_tasks, return_exceptions=True)

tests/server/events/test_event_queue.py

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,3 +364,130 @@ async def test_is_closed_reflects_state(event_queue: EventQueue) -> None:
364364
await event_queue.close()
365365

366366
assert event_queue.is_closed() is True # Closed after calling close()
367+
368+
369+
@pytest.mark.asyncio
370+
async def test_close_with_immediate_true(event_queue: EventQueue) -> None:
371+
"""Test close with immediate=True clears events immediately."""
372+
# Add some events to the queue
373+
event1 = Message(**MESSAGE_PAYLOAD)
374+
event2 = Task(**MINIMAL_TASK)
375+
await event_queue.enqueue_event(event1)
376+
await event_queue.enqueue_event(event2)
377+
378+
# Verify events are in queue
379+
assert not event_queue.queue.empty()
380+
381+
# Close with immediate=True
382+
await event_queue.close(immediate=True)
383+
384+
# Verify queue is closed and empty
385+
assert event_queue.is_closed() is True
386+
assert event_queue.queue.empty()
387+
388+
389+
@pytest.mark.asyncio
390+
async def test_close_immediate_propagates_to_children(
391+
event_queue: EventQueue,
392+
) -> None:
393+
"""Test that immediate parameter is propagated to child queues."""
394+
395+
child_queue = event_queue.tap()
396+
397+
# Add events to both parent and child
398+
event = Message(**MESSAGE_PAYLOAD)
399+
await event_queue.enqueue_event(event)
400+
401+
assert child_queue.is_closed() is False
402+
assert child_queue.queue.empty() is False
403+
404+
# close event queue
405+
await event_queue.close(immediate=True)
406+
407+
# Verify child queue was called and empty with immediate=True
408+
assert child_queue.is_closed() is True
409+
assert child_queue.queue.empty()
410+
411+
412+
@pytest.mark.asyncio
413+
async def test_clear_events_current_queue_only(event_queue: EventQueue) -> None:
414+
"""Test clear_events clears only the current queue when clear_child_queues=False."""
415+
416+
child_queue = event_queue.tap()
417+
event1 = Message(**MESSAGE_PAYLOAD)
418+
event2 = Task(**MINIMAL_TASK)
419+
await event_queue.enqueue_event(event1)
420+
await event_queue.enqueue_event(event2)
421+
422+
# Clear only parent queue
423+
await event_queue.clear_events(clear_child_queues=False)
424+
425+
# Verify parent queue is empty
426+
assert event_queue.queue.empty()
427+
428+
# Verify child queue still has its event
429+
assert not child_queue.queue.empty()
430+
assert child_queue.is_closed() is False
431+
432+
dequeued_child_event = await child_queue.dequeue_event(no_wait=True)
433+
assert dequeued_child_event == event1
434+
435+
436+
@pytest.mark.asyncio
437+
async def test_clear_events_with_children(event_queue: EventQueue) -> None:
438+
"""Test clear_events clears both current queue and child queues."""
439+
440+
# Create child queues and add events
441+
child_queue1 = event_queue.tap()
442+
child_queue2 = event_queue.tap()
443+
444+
# Add events to parent queue
445+
event1 = Message(**MESSAGE_PAYLOAD)
446+
event2 = Task(**MINIMAL_TASK)
447+
await event_queue.enqueue_event(event1)
448+
await event_queue.enqueue_event(event2)
449+
450+
# Clear all queues
451+
await event_queue.clear_events(clear_child_queues=True)
452+
453+
# Verify all queues are empty
454+
assert event_queue.queue.empty()
455+
assert child_queue1.queue.empty()
456+
assert child_queue2.queue.empty()
457+
458+
459+
@pytest.mark.asyncio
460+
async def test_clear_events_empty_queue(event_queue: EventQueue) -> None:
461+
"""Test clear_events works correctly with empty queue."""
462+
# Verify queue is initially empty
463+
assert event_queue.queue.empty()
464+
465+
# Clear events from empty queue
466+
await event_queue.clear_events()
467+
468+
# Verify queue remains empty
469+
assert event_queue.queue.empty()
470+
471+
472+
@pytest.mark.asyncio
473+
async def test_clear_events_closed_queue(event_queue: EventQueue) -> None:
474+
"""Test clear_events works correctly with closed queue."""
475+
# Add events and close queue
476+
477+
with patch('sys.version_info', (3, 12, 0)): # Simulate older Python
478+
# Mock queue.join as it's called in older versions
479+
event_queue.queue.join = AsyncMock()
480+
481+
event = Message(**MESSAGE_PAYLOAD)
482+
await event_queue.enqueue_event(event)
483+
await event_queue.close()
484+
485+
# Verify queue is closed but not empty
486+
assert event_queue.is_closed() is True
487+
assert not event_queue.queue.empty()
488+
489+
# Clear events from closed queue
490+
await event_queue.clear_events()
491+
492+
# Verify queue is now empty
493+
assert event_queue.queue.empty()

0 commit comments

Comments
 (0)