feat: add stdio transport binding#46
Conversation
Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Add server-side STDIO transport that reads JSON-RPC requests from stdin, dispatches to a RequestHandler, and writes responses to stdout. - StdioServer<H> wraps an Arc<H: RequestHandler> and runs the main loop - Handshake: server sends handshake, reads client ack before entering loop - dispatch_unary! macro for request/response methods (9 methods) - dispatch_streaming! macro for streaming methods (2 methods) - write_error helper for JSON-RPC error responses - serve() convenience function for actual stdin/stdout - Session ID from A2A_SESSION_ID env var or generated UUIDv7 Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Add 14 e2e tests using in-process tokio::io::duplex channels to wire StdioServer to a TestClient without spawning a real subprocess. Tests cover: - Unary methods: send_message, get_task, list_tasks, cancel_task - Push config CRUD: create, get, list, delete - Extended agent card retrieval - Streaming: message/stream and tasks/subscribe - Error paths: unknown method, invalid params, task not found - Handshake reject returns HandshakeFailed error - Server exits cleanly on EOF (client disconnect) - Multiple sequential requests on same connection Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
- Use char array pattern instead of closure in trim_end_matches - Use assert!/assert!(!..) instead of assert_eq! for booleans - Remove unnecessary borrows in serde_json::to_value calls Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Code Review
This pull request introduces the a2a-stdio crate, which implements an A2A v1 STDIO transport binding for client and server communication using LSP-style framing and JSON-RPC. The implementation includes a handshake protocol and support for both unary and streaming methods. Review feedback suggests enhancing the robustness of the client's streaming response handling and ensuring the server loop persists when encountering malformed JSON frames.
- Server: replace ? on serde_json::from_slice with match that sends a PARSE_ERROR response and continues the loop instead of terminating - Client: explicitly handle non-null malformed final stream response by sending an error through the channel instead of silently dropping Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
There was a problem hiding this comment.
Great job @Ketankhunti! I reviewed the PR and found few issues, but for the rest the PR looks OK.
The major one is that StdioTransport doesn't guard against concurrent requests. Since the server processes requests sequentially and the client doesn't match response ids back to requests, concurrent calls to call() / call_streaming() would race on the reader. This might not be an issue as we have only one client by design, but the API does not enforce issuing 1 request at a time, as nothing stops the caller from calling send_message and get_task concurrently.
A few options to address this:
- Document the constraint (weak) — add a doc comment on
StdioTransportwarning that callers must serialize requests. - Add a request-level lock — wrap the full write+read cycle in a single
Mutexso concurrent calls block rather than race. Cheap and prevents misuse. - Support true multiplexing — add a background reader task that routes responses by
idto pending request futures. More complex but enables concurrent requests.
|
@Ketankhunti code coverage needs to be improved from current 64% to 95% |
- framing: add MAX_FRAME_SIZE (64 MiB) cap on Content-Length to bound memory allocated from a peer-supplied header - client: drop unused _handshake field on StdioTransport - client: document parse_stdio_url limitation (args cannot contain & or ?) - crate: add README.md referenced by Cargo.toml Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
- Add a request-level Mutex held across each call/call_streaming write+read cycle so concurrent callers block instead of racing on the underlying pipes (responses are not multiplexed by id). The streaming variant moves an OwnedMutexGuard into the background reader task and drops it when the stream terminates. - Wrap the handshake read in HANDSHAKE_TIMEOUT (10s); on timeout, kill the child process and return HandshakeFailed instead of hanging spawn. - Wrap child.wait() in destroy() in CLOSE_TIMEOUT (5s); on timeout, kill and reap the child to avoid blocking forever or leaving a zombie. - Document the concurrency model on StdioTransport. Tests: - Add tests/bin/stdio_test_helper.rs with hang-no-handshake, handshake-then-hang, and slow-echo modes (declared as a [[bin]] in Cargo.toml under tests/bin/). - Add tests/spawned.rs covering: handshake timeout fires and reports HandshakeFailed; destroy() kills a hung child; concurrent calls are serialized so each future receives the response paired with its own request task_id. Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
- Extend tests/bin/stdio_test_helper.rs with full-echo (an EchoHandler that implements all 11 RequestHandler methods) and bad-response (writes a JSON-RPC response whose result cannot be deserialized into the expected type). - tests/spawned.rs now covers every Transport method against a real subprocess (send_message, send_streaming_message, get_task, list_tasks, cancel_task, subscribe_to_task, all four push_config methods, get_extended_agent_card), the deserialize-error branch in call(), and StdioTransportFactory (protocol id, successful create, spawn failure for a missing program). - client::call: treat a missing result field as Value::Null so methods returning () (e.g. delete_push_config) round-trip correctly. The server emits null but serde decodes it into None for Option<Value>; the previous code rejected that as 'response missing result'. Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
…rrors handshake.rs unit tests: - read_handshake rejects invalid JSON - read_handshake rejects empty supported_variants - read_handshake_ack EOF, invalid JSON, wrong type, missing fields e2e.rs integration tests against the in-process server: - PARSE_ERROR branch on malformed JSON - PARSE_ERROR branch on JSON that is not a JSON-RPC request - server stays alive after a malformed-JSON parse error (recovers and serves the next request) - INVALID_PARAMS branch in dispatch_streaming! (message/stream and tasks/subscribe with bad params) - handler-error branch in both dispatch_unary! and dispatch_streaming! (tasks/get and tasks/subscribe with id == 'missing'). Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Rust 1.95's clippy::useless_conversion flags the explicit .into_iter() call when extending into a type that already accepts IntoIterator. Remove it. Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
Pure formatting: multi-line match in client.rs, struct-arg formatting and timeout-while-let formatting in spawned.rs and e2e.rs. Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
- full-echo helper now drives the server through the public serve() convenience function (was StdioServer::new + run), so every spawned test using full-echo also covers serve(). - New helper modes stream-bad-frame, stream-bad-notification, stream-bad-final write crafted frames after the handshake to drive the three error branches in the client streaming background reader: parse frame, parse streaming event, and failed to parse final stream response. - New tests in spawned.rs assert each branch surfaces the expected A2AError to the consumer of the stream. Signed-off-by: Ketan Khunti <khuntiketan50@gmail.com>
|
@muscariello Coverage is now at 96.07% on the patch (up from 64% on the original PR). Pushed three follow-up commits:
Test suite is now 26 unit + 21 e2e + 19 spawned = 66 tests, all passing; clippy and fmt clean across the workspace. The remaining uncovered lines (21 in client.rs, 6 in server.rs) are error arms of |
|
Hi @msardara , Did you get a chance to review the PR? |
|
Hey @Ketankhunti, not yet but I'll have a look soon ✅ Sorry for the delay. |
|
I need to check this when I am back. No hurry. Like I said we need a custom protocol binding proposal. |
Summary
Adds a new
a2a-stdiocrate that implements the A2A STDIO transport binding - a subprocess-based transport for local agent communication using LSP-style Content-Length framing over stdin/stdout.Fixes: #15
Motivation
The A2A spec defines multiple transport bindings. The existing crate supports HTTP/REST, JSON-RPC, gRPC, and SLIMRPC. This PR adds STDIO as a Custom Protocol Binding (CPB) for local, single-connection scenarios where spawning a subprocess is simpler than standing up a network server.
Design
Based on the design agreed in #15 (Option A):
Content-Length: N\r\n\r\n<body>framingmessage/send,tasks/get, etc.)handshake→ client replies withhandshakeAck(accept/reject + variant selection)A2A_SESSION_IDenvironment variable or auto-generated UUIDv7id), followed by a final response with the original requestidChanges
New crate:
a2a-stdioerrors.rsStdioErrorenum:Io,Json,InvalidHeader,HandshakeFailed,Closedframing.rsread_frame()/write_frame()for Content-Length framinghandshake.rsHandshake,HandshakeAck,HandshakeFeaturestypes + read/write I/Oclient.rsStdioTransport(spawns subprocess, implementsTransporttrait),StdioTransportFactoryserver.rsStdioServer<H: RequestHandler>withdispatch_unary!/dispatch_streaming!macros,serve()convenience functionOther changes
a2a/src/types.rs: AddedTRANSPORT_PROTOCOL_STDIOconstantCargo.toml(workspace root): Addeda2a-stdioto members and workspace dependenciesCargo.lock: Updated with new crate entryTesting
tokio::io::duplexto wire server ↔ client in-process without spawning a subprocess. Covers all 11RequestHandlermethods, streaming, error paths, handshake reject, EOF handling, and sequential requests.cargo test --workspacepasses with zero failurescargo clippy --workspace --all-targetsclean