Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ requires-python = ">=3.13"
dependencies = [
"httpx>=0.28.1",
"httpx-sse>=0.4.0",
"mypy>=1.15.0",
"opentelemetry-api>=1.33.0",
"opentelemetry-sdk>=1.33.0",
"pydantic>=2.11.3",
"sse-starlette>=2.3.3",
"starlette>=0.46.2",
"typing-extensions>=4.13.2",
"uvicorn>=0.34.2",
]

[tool.pytest.ini_options]
Expand All @@ -35,6 +35,7 @@ members = [
[dependency-groups]
dev = [
"datamodel-code-generator>=0.30.0",
"mypy>=1.15.0",
"pytest>=8.3.5",
"pytest-asyncio>=0.26.0",
"pytest-cov>=6.1.1",
Expand Down
2 changes: 2 additions & 0 deletions src/a2a/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
SetTaskPushNotificationConfigRequest,
SetTaskPushNotificationConfigResponse,
)
from a2a.utils.telemetry import SpanKind, trace_class


class A2ACardResolver:
Expand Down Expand Up @@ -59,6 +60,7 @@ async def get_agent_card(
) from e


@trace_class(kind=SpanKind.CLIENT)
class A2AClient:
"""A2A Client."""

Expand Down
2 changes: 2 additions & 0 deletions src/a2a/server/events/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
TaskStatusUpdateEvent,
)
from a2a.utils.errors import ServerError
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)


@trace_class(kind=SpanKind.SERVER)
class EventConsumer:
"""Consumer to read events from the agent event queue."""

Expand Down
2 changes: 2 additions & 0 deletions src/a2a/server/events/event_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
TaskArtifactUpdateEvent,
TaskStatusUpdateEvent,
)
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)
Expand All @@ -26,6 +27,7 @@
)


@trace_class(kind=SpanKind.SERVER)
class EventQueue:
"""Event queue for A2A responses from agent."""

Expand Down
2 changes: 2 additions & 0 deletions src/a2a/server/events/in_memory_queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
QueueManager,
TaskQueueExists,
)
from a2a.utils.telemetry import SpanKind, trace_class


@trace_class(kind=SpanKind.SERVER)
class InMemoryQueueManager(QueueManager):
"""InMemoryQueueManager is used for a single binary management.

Expand Down
7 changes: 5 additions & 2 deletions src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
UnsupportedOperationError,
)
from a2a.utils.errors import ServerError
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)


@trace_class(kind=SpanKind.SERVER)
class DefaultRequestHandler(RequestHandler):
"""Default request handler for all incoming requests."""

Expand Down Expand Up @@ -112,7 +114,7 @@ async def on_message_send(
else:
queue = EventQueue()
result_aggregator = ResultAggregator(task_manager)
# TODO to manage the non-blocking flows.
# TODO: to manage the non-blocking flows.
producer_task = asyncio.create_task(
self._run_event_stream(
RequestContext(
Expand All @@ -128,7 +130,8 @@ async def on_message_send(
try:
consumer = EventConsumer(queue)

# TODO - register the queue for the task upon the first sign it is a task.
# TODO: - register the queue for the task upon
# the first sign it is a task.
result = await result_aggregator.consume_all(consumer)
if not result:
raise ServerError(error=InternalError())
Expand Down
6 changes: 4 additions & 2 deletions src/a2a/server/request_handlers/jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@
)
from a2a.utils.errors import ServerError
from a2a.utils.helpers import validate
from a2a.utils.telemetry import SpanKind, trace_class


logger = logging.getLogger(__name__)


@trace_class(kind=SpanKind.SERVER)
class JSONRPCHandler:
"""A handler that maps the JSONRPC Objects to the request handler and back."""
"""Maps the JSONRPC Objects to the request handler and back."""

def __init__(
self,
Expand All @@ -53,7 +55,7 @@ def __init__(

Args:
agent_card: The AgentCard describing the agent's capabilities.
request_handler: The handler instance responsible for processing A2A requests.
request_handler: The handler instance to process A2A requests.
"""
self.agent_card = agent_card
self.request_handler = request_handler
Expand Down
5 changes: 5 additions & 0 deletions src/a2a/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# type: ignore
from opentelemetry.trace import SpanKind as OTelSpanKind

from a2a.utils.artifact import new_text_artifact
from a2a.utils.helpers import (
append_artifact_to_task,
Expand All @@ -12,7 +15,9 @@
from a2a.utils.task import new_task


SpanKind = OTelSpanKind
__all__ = [
'SpanKind',
'append_artifact_to_task',
'build_text_artifact',
'create_task_obj',
Expand Down
3 changes: 3 additions & 0 deletions src/a2a/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
TextPart,
)
from a2a.utils.errors import ServerError, UnsupportedOperationError
from a2a.utils.telemetry import trace_function


logger = logging.getLogger(__name__)


@trace_function()
def create_task_obj(message_send_params: MessageSendParams) -> Task:
"""Create a new task object from message send params."""
if not message_send_params.message.contextId:
Expand All @@ -31,6 +33,7 @@ def create_task_obj(message_send_params: MessageSendParams) -> Task:
)


@trace_function()
def append_artifact_to_task(task: Task, event: TaskArtifactUpdateEvent) -> None:
"""Helper method for updating Task with new artifact data."""
if not task.artifacts:
Expand Down
Loading