Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
run: |
set -o pipefail
uv run poe db-migrations-check
uv run pytest tests/core/unit/ -m "core and unit" -n auto -q
uv run pytest tests/unit/core/ -m "core and unit" -n auto -q

- name: Run Semantic Release
id: release
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
pyproject.toml
- run: uv sync --frozen --dev
- run: uv run poe db-migrations-check
- run: uv run pytest tests/core/unit/ -m "core and unit" -n auto -q
- run: uv run pytest tests/unit/core/ -m "core and unit" -n auto -q

test-pr:
if: github.event_name == 'pull_request' && github.event.pull_request.draft == false
Expand Down
13 changes: 4 additions & 9 deletions src/kagan/cli/chat/acp.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,6 @@ def get_lock(self, backend: str) -> asyncio.Lock:
_WARMUP_STATE = OrchestratorWarmupState()


_acp_handshake_timeout_seconds = acp_handshake_timeout_seconds
_acp_process_exit_hint = acp_process_exit_hint
_friendly_acp_error_message = friendly_acp_error_message


def _is_mcp_server_unsupported_error(error: object) -> bool:
raw = str(error).lower()
return "mcp" in raw and "server" in raw and ("not implemented" in raw or "unsupported" in raw)
Expand Down Expand Up @@ -252,7 +247,7 @@ async def _acp_process_exit_message(agent_backend: str, process: Any, *, during:
if details:
compact = " ".join(line.strip() for line in details.splitlines() if line.strip())
message = f"{message} {compact[:500]}"
hint = _acp_process_exit_hint(agent_backend=agent_backend, details=details)
hint = acp_process_exit_hint(agent_backend=agent_backend, details=details)
if hint:
message = f"{message} {hint}"
return message
Expand Down Expand Up @@ -304,7 +299,7 @@ async def run_orchestrator_turn(
)

capture_client = _CaptureACPClient(on_update=on_update, permission_resolver=permission_resolver)
timeout_s = _acp_handshake_timeout_seconds(agent_backend)
timeout_s = acp_handshake_timeout_seconds(agent_backend)
try:
async with spawn_filtered_agent_process(
capture_client,
Expand Down Expand Up @@ -420,15 +415,15 @@ async def run_orchestrator_turn(
await conn.prompt(session_id=sess.session_id, prompt=prompt_blocks)
except (acp.RequestError, OSError, RuntimeError, ValueError, AttributeError) as exc:
raise AgentError(
_friendly_acp_error_message(
friendly_acp_error_message(
error=exc,
agent_backend=agent_backend,
during="prompt delivery",
)
) from exc
except (acp.RequestError, OSError, RuntimeError, ValueError, AttributeError) as exc:
raise AgentError(
_friendly_acp_error_message(error=exc, agent_backend=agent_backend, during="handshake")
friendly_acp_error_message(error=exc, agent_backend=agent_backend, during="handshake")
) from exc
finally:
if not lightweight and mcp_path.exists():
Expand Down
5 changes: 2 additions & 3 deletions src/kagan/cli/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,10 +186,9 @@ def _resolve_backend(agent_backend: str | None, tool: str | None) -> str:


def _get_backend_executable(backend_name: str) -> str:
from kagan.core import get_backend
from kagan.core import get_backend_spec

backend = get_backend(backend_name)
executable = backend.get("executable")
executable = get_backend_spec(backend_name).executable
if not isinstance(executable, str) or not executable:
raise click.ClickException(f"Invalid backend configuration for: {backend_name}")
return executable
Expand Down
2 changes: 0 additions & 2 deletions src/kagan/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
BackendSpec,
build_agent_environment,
build_mcp_manifest,
get_backend,
get_backend_spec,
list_available_backends,
list_backend_specs,
Expand Down Expand Up @@ -345,7 +344,6 @@
"format_duration",
"format_percentage",
"friendly_acp_error_message",
"get_backend",
"get_backend_spec",
"get_conflicts",
"get_latest_session",
Expand Down
38 changes: 5 additions & 33 deletions src/kagan/core/_acp.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
OPENCODE_BACKEND,
get_backend_spec,
)
from kagan.core._subprocess import terminate_process
from kagan.core.errors import AgentError

_ACP_STARTUP_TIMEOUT_ENV_KEY = "KAGAN_ACP_STARTUP_TIMEOUT_SECONDS"
Expand Down Expand Up @@ -118,10 +119,6 @@ def acp_startup_timeout_seconds(agent_backend: str) -> float:
return _default_acp_timeout_seconds(agent_backend)


def _acp_startup_timeout_seconds(agent_backend: str) -> float:
return acp_startup_timeout_seconds(agent_backend)


def _infer_backend_name_from_process(process: asyncio.subprocess.Process) -> str:
args = [str(part).lower() for part in (getattr(process, "args", []) or [])]
joined = " ".join(args)
Expand Down Expand Up @@ -192,14 +189,6 @@ def friendly_acp_error_message(*, error: object, agent_backend: str, during: str
return f"{prefix} {raw}"


def _friendly_startup_error_message(*, error: object, agent_backend: str, during: str) -> str:
return friendly_acp_error_message(
error=error,
agent_backend=agent_backend,
during=during,
)


class KaganACPClient(acp.Client):
"""ACP client implementation forwarding session updates to a callback."""

Expand Down Expand Up @@ -437,10 +426,6 @@ def acp_process_exit_hint(*, agent_backend: str, details: str) -> str | None:
return None


def _acp_process_exit_hint(*, agent_backend: str, details: str) -> str | None:
return acp_process_exit_hint(agent_backend=agent_backend, details=details)


async def _acp_process_exit_message(
process: asyncio.subprocess.Process, *, during: str, agent_backend: str
) -> str | None:
Expand All @@ -462,19 +447,6 @@ async def _acp_process_exit_message(
return message


async def _terminate_acp_process(process: asyncio.subprocess.Process) -> None:
if process.returncode is not None:
return
with contextlib.suppress(ProcessLookupError):
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5.0)
except TimeoutError:
with contextlib.suppress(ProcessLookupError):
process.kill()
await process.wait()


async def run_acp_session(
process: asyncio.subprocess.Process,
client: KaganACPClient,
Expand All @@ -492,7 +464,7 @@ async def run_acp_session(

stdout = JsonRpcObjectStreamReader(process.stdout, backend_name=resolved_backend)
conn = acp.connect_to_agent(client, process.stdin, stdout)
timeout_s = _acp_startup_timeout_seconds(resolved_backend)
timeout_s = acp_startup_timeout_seconds(resolved_backend)
try:
try:
await asyncio.wait_for(
Expand Down Expand Up @@ -534,13 +506,13 @@ async def run_acp_session(
)
raise RuntimeError(timeout_message) from exc
await conn.prompt(session_id=session.session_id, prompt=[acp.text_block(prompt)])
await _terminate_acp_process(process)
await terminate_process(process)
logger.info(
"ACP one-shot prompt completed for pid={} rc={}", process.pid, process.returncode
)
except (RequestError, OSError, RuntimeError, ValueError, AttributeError) as exc:
logger.exception("ACP session failed for pid={} cwd={}", process.pid, worktree_path)
await _terminate_acp_process(process)
await terminate_process(process)
raise RuntimeError(
friendly_acp_error_message(
error=exc,
Expand All @@ -553,4 +525,4 @@ async def run_acp_session(
with contextlib.suppress(RequestError, OSError, RuntimeError):
await conn.close()
finally:
await _terminate_acp_process(process)
await terminate_process(process)
32 changes: 4 additions & 28 deletions src/kagan/core/_acp_spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,22 @@

from __future__ import annotations

import asyncio
import contextlib
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, Any

from acp.client.connection import ClientSideConnection
from acp.transports import spawn_stdio_transport

from kagan.core._acp_streams import JsonRpcObjectStreamReader
from kagan.core._subprocess import terminate_process

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Awaitable, Callable, Mapping
from collections.abc import AsyncIterator, Callable, Mapping
from pathlib import Path

import acp


async def _terminate_stdio_process(process: Any) -> None:
if getattr(process, "returncode", None) is not None:
return
terminate = getattr(process, "terminate", None)
if callable(terminate):
with contextlib.suppress(ProcessLookupError):
terminate()
wait = getattr(process, "wait", None)
if not callable(wait):
return
try:
wait_result = cast("Awaitable[Any]", wait())
await asyncio.wait_for(wait_result, timeout=5.0)
except TimeoutError:
kill = getattr(process, "kill", None)
if callable(kill):
with contextlib.suppress(ProcessLookupError):
kill()
wait_result = cast("Awaitable[Any]", wait())
with contextlib.suppress(TimeoutError):
await asyncio.wait_for(wait_result, timeout=5.0)


@asynccontextmanager
async def spawn_filtered_agent_process(
to_client: Callable[[acp.Agent], acp.Client] | acp.Client,
Expand Down Expand Up @@ -69,4 +45,4 @@ async def spawn_filtered_agent_process(
try:
await conn.close()
finally:
await _terminate_stdio_process(process)
await terminate_process(process)
20 changes: 14 additions & 6 deletions src/kagan/core/_acp_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

from __future__ import annotations

import asyncio
import json
from typing import Any
from typing import TYPE_CHECKING, Any

from loguru import logger

if TYPE_CHECKING:
import asyncio

class JsonRpcObjectStreamReader(asyncio.StreamReader):

class JsonRpcObjectStreamReader:
"""Drop non-JSON-RPC stdout lines before the ACP SDK parses them.

ACP transports are line-delimited JSON-RPC. Some Windows agent launchers can
Expand Down Expand Up @@ -45,12 +47,18 @@ async def readline(self) -> bytes:

return line

async def readuntil(self, separator: bytes = b"\n") -> bytes:
return await self._reader.readuntil(separator)

async def read(self, n: int = -1) -> bytes:
return await self._reader.read(n)

async def readexactly(self, n: int) -> bytes:
return await self._reader.readexactly(n)

def at_eof(self) -> bool:
return self._reader.at_eof()
Comment thread
aorumbayev marked this conversation as resolved.

def __getattr__(self, name: str) -> Any:
return getattr(self._reader, name)

def _record_drop(self, line: bytes, *, reason: str) -> None:
self._dropped += 1
if self._dropped > 1:
Expand Down
55 changes: 6 additions & 49 deletions src/kagan/core/_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,19 +179,6 @@ class BackendSpec:
install: Mapping[OS, BackendCommand] | None = None
auth: Mapping[OS, BackendCommand] | None = None

def to_legacy_config(self) -> dict[str, Any]:
"""Project the typed spec into a dict mapping (legacy compat)."""
return {
"capabilities": tuple(sorted(cap.value for cap in self.capabilities)),
"executable": self.executable,
"prompt_flag": self.prompt_flag,
"workdir_flag": self.workdir_flag,
"env_vars": dict(self.env_vars),
"supports_acp": self.has_capability(BackendCapability.ACP_STREAMING),
"acp_command": list(self.acp_command),
"acp_args": list(self.acp_args),
}

def has_capability(self, capability: BackendCapability) -> bool:
"""Return whether the backend declares *capability*."""
return capability in self.capabilities
Expand Down Expand Up @@ -229,21 +216,6 @@ def resolve_command(
return exact
return mapping.get("*")

def guidance_hints(self) -> tuple[str, ...]:
"""Return explicit setup hint strings for this backend (legacy shim).

Derives human-readable hints from the structured ``install`` / ``auth``
mappings so that existing callers continue to work during the transition.
Each hint is formatted as ``"{description} {command}"`` so callers that
previously searched hint text for the command string continue to work.
"""
hints: list[str] = []
for action in ("install", "auth"):
# type: ignore[arg-type] — looping over known-valid Literal strings.
cmd = self.resolve_command(action) # type: ignore[arg-type]
if cmd is not None:
hints.append(f"{cmd.description} {cmd.command}")
return tuple(hints)


CLAUDE_CODE_BACKEND: Final = "claude-code"
Expand Down Expand Up @@ -681,11 +653,6 @@ def list_backend_specs() -> dict[str, BackendSpec]:
return dict(_BACKEND_SPECS)


def get_backend(name: str) -> dict[str, Any]:
"""Return the legacy config dict for *name*, raising AgentError if unknown."""
return get_backend_spec(name).to_legacy_config()


def list_backends() -> list[str]:
"""Return all registered backend names."""
return list(_BACKEND_SPECS)
Expand Down Expand Up @@ -899,17 +866,13 @@ async def spawn_agent(
_MAX_CUMULATIVE_BYTES: Final[int] = 500 * 1024 * 1024 # 500 MB total per session


class _ByteCountingStreamReader(asyncio.StreamReader):
"""StreamReader subclass that enforces a cumulative byte cap.
class _ByteCountingStreamReader:
"""Composition wrapper that enforces a cumulative byte cap on reads.

The ACP JSON-RPC read loop (inside the ``acp`` library) calls ``readline()``
or ``read()`` on the underlying reader. This subclass wraps another reader,
counts every byte returned, and terminates the associated process when the
cumulative limit is exceeded, preventing unbounded memory growth.

Inherits from ``asyncio.StreamReader`` so ``isinstance()`` checks pass when
the ACP SDK validates stream types in ``ClientSideConnection.__init__``.
Delegates all reads to the wrapped reader rather than using inherited state.
The ACP JSON-RPC read loop calls ``readline()`` or ``read()`` on the
underlying reader. This wrapper counts every byte returned and terminates
the associated process when the cumulative limit is exceeded, preventing
unbounded memory growth.
"""

def __init__(
Expand All @@ -918,9 +881,6 @@ def __init__(
process: asyncio.subprocess.Process,
cumulative_limit: int = _MAX_CUMULATIVE_BYTES,
) -> None:
# Note: We skip calling super().__init__() because we delegate all
# operations to self._reader. The inheritance is only to pass
# isinstance() checks in the ACP SDK.
self._reader = reader
self._process = process
self._cumulative_bytes = 0
Expand Down Expand Up @@ -964,9 +924,6 @@ async def readexactly(self, n: int) -> bytes:
def at_eof(self) -> bool:
return self._reader.at_eof()

def __getattr__(self, name: str) -> Any:
return getattr(self._reader, name)


async def spawn_agent_via_acp(
backend_name: str,
Expand Down
Loading
Loading