-
Notifications
You must be signed in to change notification settings - Fork 302
feat: Add Kafka as a transport #434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @bigxin0124, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces comprehensive support for Apache Kafka as a new transport protocol within the a2a-sdk. This feature enables agents to communicate using Kafka's robust, scalable, and asynchronous messaging capabilities, expanding the existing transport options (JSONRPC, gRPC, HTTP+JSON). The changes include new client and server implementations for Kafka transport, dependency updates, and corresponding documentation.
Highlights
- New Kafka Transport Implementation: Added KafkaClientTransport for client-side communication and KafkaServerApp for server-side handling, allowing agents to send and receive messages via Kafka topics.
- Request-Response Correlation: Introduced a CorrelationManager to handle the asynchronous request-response pattern over Kafka, ensuring proper message correlation for both single and streaming interactions.
- Dependency Management: Updated pyproject.toml to include aiokafka as an optional dependency, enabling users to install Kafka support when needed.
- Documentation Updates: Modified README.md to provide clear instructions for installing the SDK with Kafka transport support.
- Seamless Integration: Integrated Kafka transport into the existing client factory and server request handler mechanisms, ensuring it can be used alongside other supported protocols.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces Kafka transport support, which is a significant and well-implemented feature. The code is generally well-structured, with clear separation of concerns between transport-specific logic and business logic handlers. The addition of comprehensive tests for the new components is also commendable.
My review includes suggestions to improve the API design by avoiding modifications to private attributes, enhance timeout handling in streaming methods for better robustness, and fix a few issues in the tests to ensure they are correct and effective. Overall, this is a great contribution.
|
|
||
| return transport | ||
|
|
||
| def register(self, label: str, generator: TransportProducer) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modifying a "private" attribute _auto_start from outside the class is a leaky abstraction. It makes the KafkaClientTransport class harder to understand and use correctly on its own.
Consider making auto_start a public parameter in the KafkaClientTransport's __init__ or create method. This would make the contract between the factory and the transport explicit.
For example:
- Add
auto_start: bool = FalsetoKafkaClientTransport.__init__and setself._auto_start = auto_start. - In
KafkaClientTransport.create, accept anauto_startparameter and pass it to the constructor. - Here in
_create_kafka_transport, you would callKafkaClientTransport.create(..., auto_start=True).
| async def send_message_streaming( | ||
| self, | ||
| request: MessageSendParams, | ||
| *, | ||
| context: ClientCallContext | None = None, | ||
| ) -> AsyncGenerator[Message | Task | TaskStatusUpdateEvent | TaskArtifactUpdateEvent]: | ||
| """Send a streaming message request to the agent and yield responses as they arrive.""" | ||
| await self._ensure_started() | ||
| correlation_id = await self._send_request('message_send', request, context, streaming=True) | ||
|
|
||
| # Register streaming request | ||
| streaming_future = await self.correlation_manager.register_streaming(correlation_id) | ||
|
|
||
| try: | ||
| timeout = 30.0 | ||
| if context and context.timeout: | ||
| timeout = context.timeout | ||
|
|
||
| # Yield responses as they arrive | ||
| while not streaming_future.is_done(): | ||
| try: | ||
| # Wait for next response with timeout | ||
| result = await asyncio.wait_for(streaming_future.get(), timeout=5.0) | ||
| yield result | ||
| except asyncio.TimeoutError: | ||
| # Check if stream is done or if we've exceeded total timeout | ||
| if streaming_future.is_done(): | ||
| break | ||
| # Continue waiting for more responses | ||
| continue | ||
|
|
||
| except Exception as e: | ||
| await self.correlation_manager.complete_with_exception( | ||
| correlation_id, | ||
| A2AClientError(f"Streaming request failed: {e}") | ||
| ) | ||
| raise A2AClientError(f"Streaming request failed: {e}") from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The streaming methods (send_message_streaming and resubscribe) do not seem to respect the total timeout specified in the context. The timeout variable is read from the context but then a hardcoded timeout=5.0 is used for polling the next item from the stream. This could lead to the method waiting indefinitely if the server stops sending messages without closing the stream.
The total duration of the streaming operation should be limited by the context's timeout. You could achieve this by wrapping the entire async for loop in asyncio.wait_for or by manually checking the deadline within the loop.
| @@ -0,0 +1,60 @@ | |||
| #!/usr/bin/env python3 | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file appears to be a temporary test script rather than a formal unit or integration test. It uses sys.path manipulation to import from src and tries to import a module kafka_example which is not included in the pull request, causing it to fail. Such scripts should either be removed or moved to a dedicated examples directory and updated to be runnable. The same applies to test_simple_kafka.py.
| async def test_send_message_timeout(self, kafka_transport): | ||
| """Test send message with timeout.""" | ||
| with patch.object(kafka_transport, '_send_request') as mock_send, \ | ||
| patch.object(kafka_transport.correlation_manager, 'register') as mock_register: | ||
|
|
||
| # Create a future that never resolves | ||
| future = asyncio.Future() | ||
| mock_register.return_value = future | ||
| mock_send.return_value = "test-correlation-id" | ||
|
|
||
| request = MessageSendParams( | ||
| message=Message( | ||
| message_id="msg-1", | ||
| role=Role.user, | ||
| parts=[Part(root=TextPart(text="test message"))], | ||
| ) | ||
| ) | ||
|
|
||
| # Should timeout | ||
| with pytest.raises(A2AClientError, match="Request timed out"): | ||
| await asyncio.wait_for( | ||
| kafka_transport.send_message(request), | ||
| timeout=0.1 | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test for message timeout is flawed. The outer asyncio.wait_for will cancel the kafka_transport.send_message call before its internal timeout logic can be triggered. This means you are not testing the timeout handling within send_message itself.
A better approach is to pass a ClientCallContext with a short timeout to the send_message method. This will correctly trigger the internal timeout logic and allow you to assert that the correct A2AClientError is raised.
@pytest.mark.asyncio
async def test_send_message_timeout(self, kafka_transport):
"""Test send message with timeout."""
from a2a.client.middleware import ClientCallContext
with patch.object(kafka_transport, '_send_request') as mock_send, \
patch.object(kafka_transport.correlation_manager, 'register') as mock_register:
# Create a future that never resolves
future = asyncio.Future()
mock_register.return_value = future
mock_send.return_value = "test-correlation-id"
request = MessageSendParams(
message=Message(
message_id="msg-1",
role=Role.user,
parts=[Part(root=TextPart(text="test message"))],
)
)
context = ClientCallContext(timeout=0.01)
# Should timeout
with pytest.raises(A2AClientError, match="Request timed out"):
await kafka_transport.send_message(request, context=context)|
|
||
| try: | ||
| # Wait for response with timeout | ||
| timeout = 30.0 # Default timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default timeout value 30.0 is hardcoded and repeated in several methods (send_message, get_task, cancel_task, etc.). To improve maintainability and consistency, it would be better to define this as a module-level constant, for example: DEFAULT_REQUEST_TIMEOUT = 30.0, and use it in all request methods.
|
|
||
| import asyncio | ||
| import uuid | ||
| from typing import Any, Dict, Optional, Set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| self, | ||
| reply_topic: str, | ||
| correlation_id: str, | ||
| result: any, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
@pstephengoogle @kthota-g Is Kafka support something we should consider adding to the SDK as a transport? |
|
Kafka as a transport is not something we are currently looking to add to this SDK. If you would like, you could create another library which builds on the a2a-sdk which adds this support. |
Description
Thank you for opening a Pull Request!
Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
CONTRIBUTINGGuide.fix:which represents bug fixes, and correlates to a SemVer patch.feat:represents a new feature, and correlates to a SemVer minor.feat!:, orfix!:,refactor!:, etc., which represent a breaking change (indicated by the!) and will result in a SemVer major.bash scripts/format.shfrom the repository root to format)Fixes #<issue_number_goes_here> 🦕