Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 0 additions & 48 deletions src/a2a/server/agent_execution/base_agent_executor.py

This file was deleted.

22 changes: 21 additions & 1 deletion src/a2a/server/events/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class EventConsumer:

def __init__(self, queue: EventQueue):
self.queue = queue
self._timeout = 0.5
self._exception: BaseException | None = None
logger.debug('EventConsumer initialized')

async def consume_one(self) -> Event:
Expand All @@ -45,8 +47,15 @@ async def consume_all(self) -> AsyncGenerator[Event]:
"""Consume all the generated streaming events from the agent."""
logger.debug('Starting to consume all events from the queue.')
while True:
if self._exception:
raise self._exception
try:
event = await self.queue.dequeue_event()
# We use a timeout when waiting for an event from the queue.
# This is required because it allows the loop to check if
# `self._exception` has been set by the `agent_task_callback`.
# Without the timeout, loop might hang indefinitely if no events are
# enqueued by the agent and the agent simply threw an exception
event = await asyncio.wait_for(self.queue.dequeue_event(), timeout=self._timeout)
logger.debug(
f'Dequeued event of type: {type(event)} in consume_all.'
)
Expand Down Expand Up @@ -74,5 +83,16 @@ async def consume_all(self) -> AsyncGenerator[Event]:
logger.debug('Stopping event consumption in consume_all.')
self.queue.close()
break
except asyncio.TimeoutError:
# continue polling until there is a final event
continue
except asyncio.QueueShutDown:
break





def agent_task_callback(self, agent_task: asyncio.Task[None]):
if agent_task.exception() is not None:
self._exception = agent_task.exception()
2 changes: 2 additions & 0 deletions src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ async def on_message_send(
await self._register_producer(task_id, producer_task)

consumer = EventConsumer(queue)
producer_task.add_done_callback(consumer.agent_task_callback)

interrupted = False
try:
Expand Down Expand Up @@ -192,6 +193,7 @@ async def on_message_send_stream(

try:
consumer = EventConsumer(queue)
producer_task.add_done_callback(consumer.agent_task_callback)
async for event in result_aggregator.consume_and_emit(consumer):
# Now we know we have a Task, register the queue
if isinstance(event, Task):
Expand Down