Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ To install with gRPC support:
uv add "a2a-sdk[grpc]"
```

To install with Kafka transport support:

```bash
uv add "a2a-sdk[kafka]"
```

To install with OpenTelemetry tracing support:

```bash
Expand Down Expand Up @@ -87,6 +93,12 @@ To install with gRPC support:
pip install "a2a-sdk[grpc]"
```

To install with Kafka transport support:

```bash
pip install "a2a-sdk[kafka]"
```

To install with OpenTelemetry tracing support:

```bash
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ sqlite = ["sqlalchemy[asyncio,aiosqlite]>=2.0.0"]
sql = ["sqlalchemy[asyncio,postgresql-asyncpg,aiomysql,aiosqlite]>=2.0.0"]
encryption = ["cryptography>=43.0.0"]
grpc = ["grpcio>=1.60", "grpcio-tools>=1.60", "grpcio_reflection>=1.7.0"]
kafka = ["aiokafka>=0.11.0"]
telemetry = ["opentelemetry-api>=1.33.0", "opentelemetry-sdk>=1.33.0"]

[project.urls]
Expand Down
31 changes: 31 additions & 0 deletions src/a2a/client/client_factory.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from __future__ import annotations

import logging
Expand Down Expand Up @@ -25,6 +25,11 @@
except ImportError:
GrpcTransport = None # type: ignore # pyright: ignore

try:
from a2a.client.transports.kafka import KafkaClientTransport
except ImportError:
KafkaClientTransport = None # type: ignore # pyright: ignore


logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -97,6 +102,32 @@
TransportProtocol.grpc,
GrpcTransport.create,
)
if TransportProtocol.kafka in supported:
if KafkaClientTransport is None:
raise ImportError(
'To use KafkaClient, its dependencies must be installed. '
'You can install them with \'pip install "a2a-sdk[kafka]"\''
)
self.register(
TransportProtocol.kafka,
self._create_kafka_transport,
)

def _create_kafka_transport(
self,
card: AgentCard,
url: str,
config: ClientConfig,
interceptors: list[ClientCallInterceptor],
) -> ClientTransport:
"""Create a Kafka transport that will auto-start when first used."""
# Create the transport using the existing create method
transport = KafkaClientTransport.create(card, url, config, interceptors)

Check failure on line 126 in src/a2a/client/client_factory.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (W293)

src/a2a/client/client_factory.py:126:1: W293 Blank line contains whitespace
# Mark the transport for auto-start when first used
transport._auto_start = True

Check failure on line 128 in src/a2a/client/client_factory.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (SLF001)

src/a2a/client/client_factory.py:128:9: SLF001 Private member accessed: `_auto_start`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Modifying a "private" attribute _auto_start from outside the class is a leaky abstraction. It makes the KafkaClientTransport class harder to understand and use correctly on its own.

Consider making auto_start a public parameter in the KafkaClientTransport's __init__ or create method. This would make the contract between the factory and the transport explicit.

For example:

  1. Add auto_start: bool = False to KafkaClientTransport.__init__ and set self._auto_start = auto_start.
  2. In KafkaClientTransport.create, accept an auto_start parameter and pass it to the constructor.
  3. Here in _create_kafka_transport, you would call KafkaClientTransport.create(..., auto_start=True).


Check failure on line 129 in src/a2a/client/client_factory.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Ruff (W293)

src/a2a/client/client_factory.py:129:1: W293 Blank line contains whitespace
return transport

def register(self, label: str, generator: TransportProducer) -> None:
"""Register a new transport producer for a given transport label."""
Expand Down
6 changes: 6 additions & 0 deletions src/a2a/client/transports/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@
except ImportError:
GrpcTransport = None # type: ignore

try:
from a2a.client.transports.kafka import KafkaClientTransport
except ImportError:
KafkaClientTransport = None # type: ignore


__all__ = [
'ClientTransport',
'GrpcTransport',
'JsonRpcTransport',
'KafkaClientTransport',
'RestTransport',
]
Loading
Loading