diff --git a/src/google/adk/a2a/converters/request_converter.py b/src/google/adk/a2a/converters/request_converter.py index 1746ec0bca..29c6372c8d 100644 --- a/src/google/adk/a2a/converters/request_converter.py +++ b/src/google/adk/a2a/converters/request_converter.py @@ -22,6 +22,7 @@ from google.genai import types as genai_types from pydantic import BaseModel +from ...agents.run_config import StreamingMode from ...runners import RunConfig from ..experimental import a2a_experimental from .part_converter import A2APartToGenAIPartConverter @@ -44,6 +45,7 @@ class AgentRunRequest(BaseModel): [ RequestContext, A2APartToGenAIPartConverter, + Optional[StreamingMode], ], AgentRunRequest, ] @@ -55,6 +57,8 @@ class AgentRunRequest(BaseModel): Args: request: The incoming request context from the A2A server. part_converter: A function to convert A2A content parts to GenAI parts. + streaming_mode: Optional streaming mode for the agent execution. If None, + defaults to StreamingMode.NONE. Returns: An RunnerRequest object containing the keyword arguments for ADK runner's run_async method. @@ -78,12 +82,15 @@ def _get_user_id(request: RequestContext) -> str: def convert_a2a_request_to_agent_run_request( request: RequestContext, part_converter: A2APartToGenAIPartConverter = convert_a2a_part_to_genai_part, + streaming_mode: Optional[StreamingMode] = None, ) -> AgentRunRequest: """Converts an A2A RequestContext to an AgentRunRequest model. Args: request: The incoming request context from the A2A server. part_converter: A function to convert A2A content parts to GenAI parts. + streaming_mode: Optional streaming mode for the agent execution. If None, + defaults to StreamingMode.NONE. Returns: A AgentRunRequest object ready to be used as arguments for the ADK runner. @@ -113,5 +120,8 @@ def convert_a2a_request_to_agent_run_request( role='user', parts=output_parts, ), - run_config=RunConfig(custom_metadata=custom_metadata), + run_config=RunConfig( + custom_metadata=custom_metadata, + streaming_mode=streaming_mode or StreamingMode.NONE, + ), ) diff --git a/src/google/adk/a2a/executor/a2a_agent_executor.py b/src/google/adk/a2a/executor/a2a_agent_executor.py index b6880aaa5c..a7a8c714ed 100644 --- a/src/google/adk/a2a/executor/a2a_agent_executor.py +++ b/src/google/adk/a2a/executor/a2a_agent_executor.py @@ -38,6 +38,7 @@ from pydantic import BaseModel from typing_extensions import override +from ...agents.run_config import StreamingMode from ...utils.context_utils import Aclosing from ..converters.event_converter import AdkEventToA2AEventsConverter from ..converters.event_converter import convert_event_to_a2a_events @@ -69,6 +70,8 @@ class A2aAgentExecutorConfig(BaseModel): convert_a2a_request_to_agent_run_request ) event_converter: AdkEventToA2AEventsConverter = convert_event_to_a2a_events + streaming_mode: Optional[StreamingMode] = None + """Optional streaming mode for agent execution. If None, defaults to StreamingMode.NONE.""" @a2a_experimental @@ -190,6 +193,7 @@ async def _handle_request( run_request = self._config.request_converter( context, self._config.a2a_part_converter, + self._config.streaming_mode, ) # ensure the session exists diff --git a/src/google/adk/a2a/utils/agent_to_a2a.py b/src/google/adk/a2a/utils/agent_to_a2a.py index 1a1ba35618..e6328d239a 100644 --- a/src/google/adk/a2a/utils/agent_to_a2a.py +++ b/src/google/adk/a2a/utils/agent_to_a2a.py @@ -25,12 +25,14 @@ from starlette.applications import Starlette from ...agents.base_agent import BaseAgent +from ...agents.run_config import StreamingMode from ...artifacts.in_memory_artifact_service import InMemoryArtifactService from ...auth.credential_service.in_memory_credential_service import InMemoryCredentialService from ...memory.in_memory_memory_service import InMemoryMemoryService from ...runners import Runner from ...sessions.in_memory_session_service import InMemorySessionService from ..executor.a2a_agent_executor import A2aAgentExecutor +from ..executor.a2a_agent_executor import A2aAgentExecutorConfig from ..experimental import a2a_experimental from .agent_card_builder import AgentCardBuilder @@ -79,6 +81,7 @@ def to_a2a( protocol: str = "http", agent_card: Optional[Union[AgentCard, str]] = None, runner: Optional[Runner] = None, + streaming_mode: Optional[StreamingMode] = None, ) -> Starlette: """Convert an ADK agent to a A2A Starlette application. @@ -92,6 +95,10 @@ def to_a2a( agent. runner: Optional pre-built Runner object. If not provided, a default runner will be created using in-memory services. + streaming_mode: Optional streaming mode for agent execution. If None, + defaults to StreamingMode.NONE. Set to StreamingMode.SSE + to enable Server-Sent Events streaming for real-time + responses. Returns: A Starlette application that can be run with uvicorn @@ -103,6 +110,9 @@ def to_a2a( # Or with custom agent card: app = to_a2a(agent, agent_card=my_custom_agent_card) + + # With SSE streaming enabled: + app = to_a2a(agent, streaming_mode=StreamingMode.SSE) """ # Set up ADK logging to ensure logs are visible when using uvicorn directly adk_logger = logging.getLogger("google_adk") @@ -123,8 +133,14 @@ async def create_runner() -> Runner: # Create A2A components task_store = InMemoryTaskStore() + # Create executor config with streaming mode + executor_config = A2aAgentExecutorConfig( + streaming_mode=streaming_mode, + ) + agent_executor = A2aAgentExecutor( runner=runner or create_runner, + config=executor_config, ) request_handler = DefaultRequestHandler(