Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/linters/.jscpd.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"ignore": ["**/.github/**", "**/.git/**", "**/tests/**", "**/examples/**"],
"threshold": 3,
"threshold": 0,
"reporters": ["html", "markdown"]
}
75 changes: 45 additions & 30 deletions src/a2a/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,36 @@
from a2a.utils.telemetry import SpanKind, trace_class


async def _make_httpx_request(
client: httpx.AsyncClient,
method: str,
url: str,
json_payload: dict[str, Any] | None = None,
http_kwargs: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Makes an HTTP request and handles common errors, returning parsed JSON."""
try:
if method.upper() == 'GET':
response = await client.get(url, **(http_kwargs or {}))
elif method.upper() == 'POST':
response = await client.post(
url, json=json_payload, **(http_kwargs or {})
)
else:
raise ValueError(f'Unsupported HTTP method: {method}')

response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise A2AClientHTTPError(e.response.status_code, str(e)) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e


class A2ACardResolver:
"""Agent Card resolver."""

Expand All @@ -43,21 +73,13 @@ def __init__(
async def get_agent_card(
self, http_kwargs: dict[str, Any] | None = None
) -> AgentCard:
try:
response = await self.httpx_client.get(
f'{self.base_url}/{self.agent_card_path}',
**(http_kwargs or {}),
)
response.raise_for_status()
return AgentCard.model_validate(response.json())
except httpx.HTTPStatusError as e:
raise A2AClientHTTPError(e.response.status_code, str(e)) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e
response_json = await _make_httpx_request(
client=self.httpx_client,
method='GET',
url=f'{self.base_url}/{self.agent_card_path}',
http_kwargs=http_kwargs,
)
return AgentCard.model_validate(response_json)


@trace_class(kind=SpanKind.CLIENT)
Expand Down Expand Up @@ -154,22 +176,15 @@ async def _send_request(

Args:
rpc_request_payload: JSON RPC payload for sending the request
**kwargs: Additional keyword arguments to pass to the httpx client.
http_kwargs: Additional keyword arguments to pass to the httpx client.
"""
try:
response = await self.httpx_client.post(
self.url, json=rpc_request_payload, **(http_kwargs or {})
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
raise A2AClientHTTPError(e.response.status_code, str(e)) from e
except json.JSONDecodeError as e:
raise A2AClientJSONError(str(e)) from e
except httpx.RequestError as e:
raise A2AClientHTTPError(
503, f'Network communication error: {e}'
) from e
return await _make_httpx_request(
client=self.httpx_client,
method='POST',
url=self.url,
json_payload=rpc_request_payload,
http_kwargs=http_kwargs,
)

async def get_task(
self,
Expand Down
54 changes: 19 additions & 35 deletions src/a2a/utils/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,7 @@ def trace_function(
f'Start tracing for {actual_span_name}, is_async_func {is_async_func}'
)

@functools.wraps(func)
async def async_wrapper(*args, **kwargs) -> any:
"""Async Wrapper for the decorator."""
logger.debug('Start async tracer')
async def _invoke_with_tracing(*args, **kwargs):
tracer = trace.get_tracer(
INSTRUMENTING_MODULE_NAME, INSTRUMENTING_MODULE_VERSION
)
Expand All @@ -155,7 +152,11 @@ async def async_wrapper(*args, **kwargs) -> any:

try:
# Async wrapper, await for the function call to complete.
result = await func(*args, **kwargs)
if is_async_func:
result = await func(*args, **kwargs)
# Sync wrapper, execute the function call.
else:
result = func(*args, **kwargs)
span.set_status(StatusCode.OK)
return result

Expand All @@ -175,39 +176,22 @@ async def async_wrapper(*args, **kwargs) -> any:
f'attribute_extractor error in span {actual_span_name}: {attr_e}'
)

@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
"""Async Wrapper for the decorator."""
logger.debug('Start async tracer')
return await _invoke_with_tracing(
*args,
**kwargs,
)

@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
"""Sync Wrapper for the decorator."""
tracer = trace.get_tracer(INSTRUMENTING_MODULE_NAME)
with tracer.start_as_current_span(actual_span_name, kind=kind) as span:
if attributes:
for k, v in attributes.items():
span.set_attribute(k, v)

result = None
exception = None

try:
# Sync wrapper, execute the function call.
result = func(*args, **kwargs)
span.set_status(StatusCode.OK)
return result

except Exception as e:
exception = e
span.record_exception(e)
span.set_status(StatusCode.ERROR, description=str(e))
raise
finally:
if attribute_extractor:
try:
attribute_extractor(
span, args, kwargs, result, exception
)
except Exception as attr_e:
logger.error(
f'attribute_extractor error in span {actual_span_name}: {attr_e}'
)
return _invoke_with_tracing(
*args,
**kwargs,
)

return async_wrapper if is_async_func else sync_wrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what is the need for two wrappers now, since they internally call the same function.

Maybe directly return the _invoke_with_tracing.

@rajeshvelicheti could you confirm.


Expand Down