Skip to content

Commit 457416d

Browse files
aorumbayevclaude
andauthored
fix(core): restore asyncio.StreamReader inheritance on ACP stdio wrappers (#152)
* fix(core): restore asyncio.StreamReader inheritance on ACP stdio wrappers ClientSideConnection.__init__ in the upstream ACP SDK enforces isinstance(input_stream, asyncio.StreamWriter) and isinstance(output_stream, asyncio.StreamReader) before constructing the JSON-RPC connection. The earlier branch commit 1207f27 ("refactor(core): P1+P2-A/B dead code") rewrote JsonRpcObjectStreamReader and _ByteCountingStreamReader as plain composition classes on the assumption that the inheritance was decorative. It wasn't — it was load-bearing for that isinstance gate. Released as 0.19.0b34, the wrappers no longer match the gate and the TUI fails on first ACP turn with: Agent session failed: ClientSideConnection requires asyncio StreamWriter/StreamReader Restore the inheritance on both wrappers without calling super().__init__ (state is fully delegated to the wrapped reader). Document the contract in the class docstring so a future cleanup pass keeps the inheritance. Lock the contract with a unit regression test in tests/unit/test_acp_session.py that constructs a real ClientSideConnection over each wrapper and asserts the isinstance gate accepts it. The previous unit suite stubbed the connection out with _FakeConnection / _FakeProcess, so the production isinstance check was never exercised — by testing.md this is exactly the "contract acceptance tests don't reach" scenario where a unit test earns its place. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test: add real-stdio ACP integration harness with hermetic echo agent Adopts patterns from the ACP reference suite (anthropics/agent-protocol): - tests/helpers/echo_agent.py — vendored examples/echo_agent.py. A real ACP agent in 80 lines that we spawn via sys.executable. No provider binary needed. - tests/helpers/acp_loopback.py — TCP-loopback fixture that yields real asyncio.StreamReader/StreamWriter pairs (asyncio.start_server + asyncio.open_connection). Lets tests construct the real acp.client.connection.ClientSideConnection without a subprocess. - tests/integration/acp_real/test_stream_wrappers.py — drives both wrappers through the real SDK and the full handshake → session → prompt → notification → teardown roundtrip via spawn_filtered_agent_process. Fails with the production error message when inheritance is removed: TypeError: ClientSideConnection requires asyncio StreamWriter/StreamReader The previous always-on suite stubbed the SDK out entirely (_FakeConnection, _FakeProcess) so no test ever exercised the production isinstance gate. That's why 0.19.0b34 shipped a runtime regression with a green CI bar. docs/internal/testing.md gains a "Real-stdio integration tests" section explaining the new layer and when to add to it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(core): call super().__init__() on ACP stream wrappers Greptile review on PR 152 flagged that both wrappers subclass asyncio.StreamReader but skip super().__init__(), leaving base attributes (notably _exception) uninitialised — wrapper.exception() would AttributeError on any error path. Today's SDK only calls readline() on the wrapped reader, so the gap is unreachable, but it's a footgun for future SDK upgrades and the parent constructor is cheap (allocates a small unused buffer). Call super().__init__() in both wrappers and update docstrings to reflect the new design. Extend the integration regression test to assert wrapper.exception() returns None — verified to fail with "AttributeError: object has no attribute '_exception'" if super() is skipped. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(acp): assert wrapper.exception() in unit regression test too Mirrors the assertion added to the integration test in the previous commit. Greptile's review thread on tests/unit/test_acp_session.py:238 explicitly requested this — the unit test stopped at construction so the un-overridden exception() / set_exception() were never exercised. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(core): delegate exception()/set_exception() to wrapped reader Greptile review on PR 152 (iter 2) flagged that calling super().__init__() gives the wrapper its own _exception field that is never written to. The asyncio transport calls set_exception() on the *wrapped* reader when the process dies; the wrapper's own _exception stays None, so wrapper.exception() returns None even after a transport failure. Override exception() and set_exception() on both wrappers to delegate to self._reader, so: - A transport failure on the underlying reader is visible via the wrapper. - An SDK-level set_exception on the wrapper propagates to the underlying reader (and any blocked readline()/read() on it). Add a unit regression test that calls underlying.set_exception(...) and asserts wrapper.exception() returns the same exception, plus the inverse direction. Verified the test fails (assert is boom) without the overrides. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0bc787e commit 457416d

11 files changed

Lines changed: 506 additions & 7 deletions

File tree

docs/internal/testing.md

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,32 @@ tests exercise but don't assert on directly, or when it tests **platform-depende
5050

5151
______________________________________________________________________
5252

53+
## Real-stdio integration tests
54+
55+
Tests in `tests/integration/` exercise the *real* third-party SDK constructors and stdio path
56+
without depending on a provider binary. They sit between unit and the `KAGAN_INTEGRATION_TESTS=1`
57+
end-to-end suite, and they catch contracts unit-stubs falsify by accepting too much.
58+
59+
The current suite (`tests/integration/acp_real/`) drives the real `acp.client.connection.ClientSideConnection`
60+
over either:
61+
62+
1. **TCP-loopback streams** (`tests/helpers/acp_loopback.py`) — yields real `asyncio.StreamReader`
63+
/ `asyncio.StreamWriter` pairs via `asyncio.start_server` + `asyncio.open_connection`. Catches
64+
`isinstance(asyncio.StreamReader)` gates and read-method shape contracts in milliseconds.
65+
2. **A hermetic echo subprocess** (`tests/helpers/echo_agent.py`, vendored from the ACP SDK
66+
examples) invoked via `sys.executable`. Speaks the real ACP wire protocol, exercises
67+
handshake → session → prompt → notification → teardown end-to-end.
68+
69+
Add a test here whenever a regression escapes through stubbed SDK fakes. Example: 0.19.0b34
70+
shipped a `ClientSideConnection requires asyncio StreamWriter/StreamReader` runtime error
71+
because the always-on suite stubbed `_FakeConnection` and never constructed the real SDK
72+
type. Both checks now live in `acp_real/test_stream_wrappers.py`.
73+
74+
- Files use `pytestmark = [pytest.mark.integration]` and run on every PR (no env-var gate).
75+
- They must not require any agent binary on PATH; spawn-based tests use `sys.executable`.
76+
77+
______________________________________________________________________
78+
5379
## Isolation
5480

5581
Each test creates its own universe — fresh `tmp_path`, fresh SQLite, fresh git repo.
@@ -140,7 +166,12 @@ tests/
140166
│ └── test_web_ui.py # SPA static file serving
141167
├── integrations/ # kagan.core.integrations (behavioral)
142168
│ └── test_github.py # GitHub sync: preflight, preview, create, skip, re-import, labels
169+
├── integration/ # real-stdio integration (real SDK, hermetic agent)
170+
│ └── acp_real/
171+
│ └── test_stream_wrappers.py # Stream wrappers + spawn pipeline against the echo agent
143172
└── helpers/ # DSL: KaganDriver, FakeAgent, fixtures
173+
├── acp_loopback.py # TCP-loopback fixture yielding real asyncio.StreamReader/Writer
174+
└── echo_agent.py # Vendored ACP echo agent (run as subprocess via sys.executable)
144175
```
145176

146177
Name tests as specs: `test_<behavior>_<expected_outcome>`. Each file has 2-6 tests,

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ max-complexity = 20
133133
"src/kagan/server/_system_routes.py" = [
134134
"C901",
135135
] # Route registration nests all handlers inside one function
136+
"tests/helpers/echo_agent.py" = [
137+
"TC002",
138+
] # Vendored ACP example script run as subprocess — imports must resolve at runtime
136139

137140
[tool.ruff.format]
138141
docstring-code-format = true

src/kagan/core/_acp_streams.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,30 @@
22

33
from __future__ import annotations
44

5+
import asyncio
56
import json
6-
from typing import TYPE_CHECKING, Any
7+
from typing import Any
78

89
from loguru import logger
910

10-
if TYPE_CHECKING:
11-
import asyncio
1211

13-
14-
class JsonRpcObjectStreamReader:
12+
class JsonRpcObjectStreamReader(asyncio.StreamReader):
1513
"""Drop non-JSON-RPC stdout lines before the ACP SDK parses them.
1614
1715
ACP transports are line-delimited JSON-RPC. Some Windows agent launchers can
1816
emit terminal/control output on stdout before or between JSON-RPC frames.
1917
The upstream ACP SDK logs a full traceback for every such line, so we filter
2018
non-object JSON here and leave valid frames untouched.
19+
20+
Subclasses ``asyncio.StreamReader`` because ``ClientSideConnection.__init__``
21+
enforces ``isinstance(output_stream, asyncio.StreamReader)``. ``super().__init__``
22+
*is* called so base attributes (``_exception``, ``_buffer``, ``_loop``) are
23+
populated — methods like ``exception()`` keep working even though we delegate
24+
every read to ``self._reader``.
2125
"""
2226

2327
def __init__(self, reader: asyncio.StreamReader, *, backend_name: str) -> None:
28+
super().__init__()
2429
self._reader = reader
2530
self._backend_name = backend_name
2631
self._dropped = 0
@@ -59,6 +64,12 @@ async def readexactly(self, n: int) -> bytes:
5964
def at_eof(self) -> bool:
6065
return self._reader.at_eof()
6166

67+
def exception(self) -> BaseException | None:
68+
return self._reader.exception()
69+
70+
def set_exception(self, exc: BaseException) -> None:
71+
self._reader.set_exception(exc)
72+
6273
def _record_drop(self, line: bytes, *, reason: str) -> None:
6374
self._dropped += 1
6475
if self._dropped > 1:

src/kagan/core/_agent.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -866,13 +866,19 @@ async def spawn_agent(
866866
_MAX_CUMULATIVE_BYTES: Final[int] = 500 * 1024 * 1024 # 500 MB total per session
867867

868868

869-
class _ByteCountingStreamReader:
869+
class _ByteCountingStreamReader(asyncio.StreamReader):
870870
"""Composition wrapper that enforces a cumulative byte cap on reads.
871871
872872
The ACP JSON-RPC read loop calls ``readline()`` or ``read()`` on the
873873
underlying reader. This wrapper counts every byte returned and terminates
874874
the associated process when the cumulative limit is exceeded, preventing
875875
unbounded memory growth.
876+
877+
Subclasses ``asyncio.StreamReader`` because ``ClientSideConnection.__init__``
878+
enforces ``isinstance(output_stream, asyncio.StreamReader)``. ``super().__init__``
879+
*is* called so base attributes (``_exception``, ``_buffer``, ``_loop``) are
880+
populated — methods like ``exception()`` keep working even though we delegate
881+
every read to ``self._reader``.
876882
"""
877883

878884
def __init__(
@@ -881,6 +887,7 @@ def __init__(
881887
process: asyncio.subprocess.Process,
882888
cumulative_limit: int = _MAX_CUMULATIVE_BYTES,
883889
) -> None:
890+
super().__init__()
884891
self._reader = reader
885892
self._process = process
886893
self._cumulative_bytes = 0
@@ -924,6 +931,12 @@ async def readexactly(self, n: int) -> bytes:
924931
def at_eof(self) -> bool:
925932
return self._reader.at_eof()
926933

934+
def exception(self) -> BaseException | None:
935+
return self._reader.exception()
936+
937+
def set_exception(self, exc: BaseException) -> None:
938+
self._reader.set_exception(exc)
939+
927940

928941
async def spawn_agent_via_acp(
929942
backend_name: str,

tests/helpers/acp_loopback.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
"""TCP-loopback fixture yielding real ``asyncio`` stream pairs.
2+
3+
Ported from ``references/acp/tests/conftest.py::_Server`` (anthropics/agent-protocol).
4+
Lets tests construct the real ``acp.client.connection.ClientSideConnection`` over
5+
honest ``asyncio.StreamReader``/``asyncio.StreamWriter`` instances without
6+
spawning a subprocess. Catches contracts the SDK enforces at construction time
7+
(``isinstance`` gates) and at runtime (read-method shape) that hand-rolled
8+
fakes routinely under-specify.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import asyncio
14+
import contextlib
15+
16+
17+
class AcpLoopback:
18+
"""Async context manager that yields two end-to-end connected stream pairs.
19+
20+
On enter, opens a TCP server bound to ``127.0.0.1:0`` and a client
21+
connection to it. The "server" side acts as the agent (writes responses,
22+
reads requests); the "client" side is what Kagan would normally hand to
23+
``ClientSideConnection``.
24+
"""
25+
26+
def __init__(self) -> None:
27+
self._server: asyncio.AbstractServer | None = None
28+
self._server_reader: asyncio.StreamReader | None = None
29+
self._server_writer: asyncio.StreamWriter | None = None
30+
self._client_reader: asyncio.StreamReader | None = None
31+
self._client_writer: asyncio.StreamWriter | None = None
32+
33+
async def __aenter__(self) -> AcpLoopback:
34+
ready = asyncio.Event()
35+
36+
async def handle(
37+
reader: asyncio.StreamReader, writer: asyncio.StreamWriter
38+
) -> None:
39+
self._server_reader = reader
40+
self._server_writer = writer
41+
ready.set()
42+
43+
self._server = await asyncio.start_server(handle, host="127.0.0.1", port=0)
44+
host, port = self._server.sockets[0].getsockname()[:2]
45+
self._client_reader, self._client_writer = await asyncio.open_connection(
46+
host, port
47+
)
48+
await asyncio.wait_for(ready.wait(), timeout=2.0)
49+
return self
50+
51+
async def __aexit__(self, exc_type, exc, tb) -> None:
52+
del exc_type, exc, tb
53+
for writer in (self._client_writer, self._server_writer):
54+
if writer is None:
55+
continue
56+
writer.close()
57+
with contextlib.suppress(Exception):
58+
await writer.wait_closed()
59+
if self._server is not None:
60+
self._server.close()
61+
await self._server.wait_closed()
62+
63+
@property
64+
def client_reader(self) -> asyncio.StreamReader:
65+
assert self._client_reader is not None
66+
return self._client_reader
67+
68+
@property
69+
def client_writer(self) -> asyncio.StreamWriter:
70+
assert self._client_writer is not None
71+
return self._client_writer
72+
73+
@property
74+
def server_reader(self) -> asyncio.StreamReader:
75+
assert self._server_reader is not None
76+
return self._server_reader
77+
78+
@property
79+
def server_writer(self) -> asyncio.StreamWriter:
80+
assert self._server_writer is not None
81+
return self._server_writer

tests/helpers/echo_agent.py

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
"""Hermetic ACP echo agent for stdio integration tests.
2+
3+
Vendored from ``references/acp/examples/echo_agent.py`` (anthropics/agent-protocol).
4+
The agent speaks the real ACP wire protocol and only depends on the ``acp``
5+
package we already pin. Tests spawn this script as a subprocess via
6+
``spawn_filtered_agent_process`` to exercise the full handshake → prompt →
7+
notification → teardown path with no provider binary on PATH.
8+
"""
9+
10+
# /// script
11+
# requires-python = ">=3.10,<3.15"
12+
# dependencies = [
13+
# "agent-client-protocol",
14+
# ]
15+
# ///
16+
from __future__ import annotations
17+
18+
import asyncio
19+
from typing import Any
20+
from uuid import uuid4
21+
22+
from acp import (
23+
Agent,
24+
InitializeResponse,
25+
NewSessionResponse,
26+
PromptResponse,
27+
run_agent,
28+
text_block,
29+
update_agent_message,
30+
)
31+
from acp.interfaces import Client
32+
from acp.schema import (
33+
AudioContentBlock,
34+
ClientCapabilities,
35+
EmbeddedResourceContentBlock,
36+
HttpMcpServer,
37+
ImageContentBlock,
38+
Implementation,
39+
McpServerStdio,
40+
ResourceContentBlock,
41+
SseMcpServer,
42+
TextContentBlock,
43+
)
44+
45+
46+
class EchoAgent(Agent):
47+
_conn: Client
48+
49+
def on_connect(self, conn: Client) -> None:
50+
self._conn = conn
51+
52+
async def initialize(
53+
self,
54+
protocol_version: int,
55+
client_capabilities: ClientCapabilities | None = None,
56+
client_info: Implementation | None = None,
57+
**kwargs: Any,
58+
) -> InitializeResponse:
59+
del client_capabilities, client_info, kwargs
60+
return InitializeResponse(protocol_version=protocol_version)
61+
62+
async def new_session(
63+
self,
64+
cwd: str,
65+
mcp_servers: list[HttpMcpServer | SseMcpServer | McpServerStdio],
66+
**kwargs: Any,
67+
) -> NewSessionResponse:
68+
del cwd, mcp_servers, kwargs
69+
return NewSessionResponse(session_id=uuid4().hex)
70+
71+
async def prompt(
72+
self,
73+
prompt: list[
74+
TextContentBlock
75+
| ImageContentBlock
76+
| AudioContentBlock
77+
| ResourceContentBlock
78+
| EmbeddedResourceContentBlock
79+
],
80+
session_id: str,
81+
**kwargs: Any,
82+
) -> PromptResponse:
83+
del kwargs
84+
for block in prompt:
85+
text = (
86+
block.get("text", "")
87+
if isinstance(block, dict)
88+
else getattr(block, "text", "")
89+
)
90+
chunk = update_agent_message(text_block(text))
91+
chunk.field_meta = {"echo": True}
92+
chunk.content.field_meta = {"echo": True}
93+
94+
await self._conn.session_update(
95+
session_id=session_id, update=chunk, source="echo_agent"
96+
)
97+
return PromptResponse(stop_reason="end_turn")
98+
99+
100+
async def main() -> None:
101+
await run_agent(EchoAgent())
102+
103+
104+
if __name__ == "__main__":
105+
asyncio.run(main())

tests/integration/__init__.py

Whitespace-only changes.

tests/integration/acp_real/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)