|
1 | 1 | # Requirements: live run streaming for `agent run get --watch` |
2 | 2 |
|
3 | 3 | **Status:** proposed — not yet implemented. |
4 | | -**Audience:** the engineer/agent implementing WebSocket streaming, server + client. |
| 4 | +**Audience:** the engineer/agent implementing WebSocket streaming for the CLI. |
5 | 5 |
|
6 | 6 | ## 1. Background |
7 | 7 |
|
8 | 8 | The CLI can start runs and read their state over the public `/v1` REST API, but |
9 | 9 | it cannot stream a run's output. `agent run get --watch` exists today and gives a |
10 | 10 | **status-level** live view by polling `GET /v1/agents/runs/{id}` until the run |
11 | 11 | reaches a terminal status (`completed`/`error`/`cancelled`/`stopped`). It shows |
12 | | -status transitions and the final summary — not the step-by-step stdout/stderr or |
13 | | -token stream. |
14 | | - |
15 | | -Token-level streaming over `/v1` is explicitly deferred in the backend design |
16 | | -doc (`documents/eng/ELLIPSIS_API_AND_CLI.md` §7, "Deferred → Streaming run steps |
17 | | -through `/v1` for CLI clients"). This spec defines the work to close that gap. |
18 | | - |
19 | | -Scaffolding already in this repo (currently unused, kept as a starting point): |
20 | | -- `src/lib/ws.ts` — a `streamRun()` WebSocket client and a `StreamFrame` type |
21 | | - (`stdout`/`stderr`/`status`/`done`/`error`). It connects to |
22 | | - `${wsBase}/v1/runs/{id}/stream` with a bearer token. No reconnect/resume/heartbeat. |
23 | | -- `src/ui/RunView.tsx` — an Ink component that renders frames from `streamRun()`. |
24 | | -- `DEFAULT_WS_BASE` in `src/lib/constants.ts` (`wss://api.ellipsis.dev`). |
25 | | - |
26 | | -The reported `error: [object ErrorEvent]` came from this scaffolding connecting to |
27 | | -a non-existent server endpoint; `ws.ts` stringifies the raw `ErrorEvent`. Fix the |
28 | | -error rendering as part of this work (surface `err.message`). |
| 12 | +status transitions and the final summary — not the step-by-step output. |
| 13 | + |
| 14 | +**Crucially, the backend already streams steps live — just not over `/v1`.** The |
| 15 | +dashboard consumes a WebSocket stream; this work is about re-exposing that same |
| 16 | +stream under `/v1` for bearer-authenticated CLI clients. The bulk of the |
| 17 | +machinery (step model, persistence, event bus) already exists and must be reused, |
| 18 | +not reinvented. |
| 19 | + |
| 20 | +### How the existing dashboard stream works (source of truth to mirror) |
| 21 | + |
| 22 | +- **Endpoint:** `@router.websocket("/agents/runs/{run_id}/stream")` — |
| 23 | + `agents_router.py:1200`, loop `_stream_run_loop` at `:1147`. |
| 24 | +- **Frames:** an initial `{"type":"snapshot","run":...,"steps":[...]}`, then |
| 25 | + `{"type":"steps_append","steps":[...]}`, `{"type":"run","run":...}`, and |
| 26 | + `{"type":"heartbeat"}`. Socket closes when the run status is terminal. |
| 27 | + `STREAM_RECONCILE_SECONDS = 10` forces a DB re-read even with no event. |
| 28 | +- **Step model:** `AgentStep` (`src/models/agents/agent_step.py`) with a |
| 29 | + monotonic `step_index: int` and `data: CCStep`. `CCStep` |
| 30 | + (`src/models/agents/claude/cc_step.py`) is the typed Claude Code stream-json |
| 31 | + message — a discriminated union of assistant (text/tool_use/thinking blocks + |
| 32 | + usage), user (tool results incl. stdout/stderr), system-init, and result |
| 33 | + (cost/duration). Stored in the `agent_steps` table, indexed on |
| 34 | + `(agent_session_id, step_index)`. |
| 35 | +- **Source + signal:** Claude Code runs with `--output-format stream-json`; the |
| 36 | + sandbox runner parses each stdout line into a `CCStep`, assigns `step_index` |
| 37 | + (`run_in_sandbox.py:221-257`), inserts an `AgentStep` row, and fires |
| 38 | + `pg_notify('agent_events', {run_id, kind})` — **a pointer, never the data** |
| 39 | + (`realtime/agent_events.py`). Each `public_api` process runs an `AgentEventBus` |
| 40 | + (`realtime/agent_event_bus.py`) doing `LISTEN agent_events` on a dedicated |
| 41 | + session-mode connection, fanning notifies to per-run `asyncio.Queue`s. The WS |
| 42 | + handler wakes on a notify and **re-reads run+steps from the DB** (the DB is the |
| 43 | + source of truth; NOTIFY is only a wake-up). |
| 44 | +- **Cursor:** `step_index` is monotonic per session, but the protocol exposes |
| 45 | + **no client cursor** — each (re)connect gets a full `snapshot`, then deltas via |
| 46 | + a local `sent_steps` counter. Dropped notifies self-heal via reconcile. |
| 47 | +- **Auth:** browsers can't set WS handshake headers, so an authenticated REST |
| 48 | + call (`GET /agents/runs/{id}/stream-ticket`, `:1090`) mints a 60s HMAC ticket |
| 49 | + (`realtime/stream_tickets.py`, signed with `FRONTEND_API_KEY`) passed as |
| 50 | + `?ticket=`. The handler verifies signature+expiry, then re-checks run ownership |
| 51 | + against the DB (close `4401`/`4403` on failure). |
29 | 52 |
|
30 | 53 | ## 2. Goal |
31 | 54 |
|
32 | | -`agent run get <id> --watch` streams a run's output live, in real time, and |
33 | | -falls back to REST polling when streaming is unavailable. The same flag covers |
| 55 | +`agent run get <id> --watch` streams a run's steps live, in real time, and falls |
| 56 | +back to REST status-polling when streaming is unavailable. The same flag covers |
34 | 57 | both modes — no new top-level command. |
35 | 58 |
|
36 | 59 | ## 3. Server-side requirements (`/v1`) |
37 | 60 |
|
38 | | -1. **Endpoint:** `GET /v1/runs/{run_id}/stream`, upgraded to WebSocket. |
39 | | -2. **Auth:** `Authorization: Bearer <token>`, resolved by the same `V1Auth` |
40 | | - path as the REST API (user/API/sandbox tokens), authorizing the run's |
41 | | - customer. Reject with a close code on auth failure (see §5). |
42 | | -3. **Frame protocol (server → client), one JSON object per WS message:** |
43 | | - - `{ "type": "status", "status": "<AgentRunStatus>", "ts": "<iso8601>" }` |
44 | | - - `{ "type": "stdout", "data": "<chunk>", "seq": <int>, "ts": "<iso8601>" }` |
45 | | - - `{ "type": "stderr", "data": "<chunk>", "seq": <int>, "ts": "<iso8601>" }` |
46 | | - - `{ "type": "done", "status": "<terminal status>", "exit_status": "<...>" }` |
47 | | - - `{ "type": "error", "message": "<human-readable>" }` |
48 | | - - `seq` is a monotonic per-run cursor used for resume. |
49 | | -4. **Backfill + resume:** accept `?after_seq=<int>` (query or first client |
50 | | - message). On connect, replay buffered frames with `seq > after_seq`, then |
51 | | - stream live. This makes reconnects lossless. |
52 | | -5. **Heartbeat:** server sends WS ping (or a `status` keepalive) at a fixed |
53 | | - interval (e.g. 20s) so dead connections are detectable. |
54 | | -6. **Termination:** send a final `done` frame, then close with a normal code. |
55 | | - For an already-terminal run, replay buffered output then `done` immediately. |
56 | | -7. **Retention:** define how long run output is buffered for backfill (at least |
57 | | - the run's lifetime + a grace window). Document the limit. |
| 61 | +**Re-export the existing stream; do not build a parallel one.** Reuse |
| 62 | +`agent_steps`, `CCStep`, and the `AgentEventBus` exactly as the frontend stream |
| 63 | +does. The `/v1` endpoint is a thin re-auth + re-encode of `_stream_run_loop`. |
| 64 | + |
| 65 | +1. **Endpoint:** `GET /v1/agents/runs/{run_id}/stream`, upgraded to WebSocket. |
| 66 | +2. **Auth — bearer, not ticket.** The CLI holds a `/v1` bearer token, so resolve |
| 67 | + it with the same `V1Auth` path as the REST API (Authorization header on the |
| 68 | + handshake, which non-browser clients *can* set), authorizing the run's |
| 69 | + customer. The 60s `?ticket=` dance is a browser workaround the CLI doesn't |
| 70 | + need; don't require it. |
| 71 | +3. **Frame protocol — reuse the existing shape:** `snapshot` (run + steps), |
| 72 | + `steps_append` (new `AgentStep`s), `run` (run-row change), `heartbeat`, and a |
| 73 | + terminal close. Each step serializes as the existing `AgentStep`/`CCStep` |
| 74 | + JSON so CLI and dashboard share one schema. |
| 75 | +4. **Add a real resume cursor.** Accept `?since=<step_index>` (and/or a first |
| 76 | + client message). On connect, send only steps with `step_index > since` |
| 77 | + (skipping the full snapshot) — a CLI reconnect must resume incrementally, not |
| 78 | + re-download every step. Absent `since`, behave like today (full snapshot). |
| 79 | + Consider also adding `?since=` to the REST `GET /agents/runs/{id}/steps` |
| 80 | + (`agents_router.py:1073`), which currently takes no cursor. |
| 81 | +5. **Heartbeat / reconcile:** keep the existing heartbeat + 10s DB reconcile so |
| 82 | + dropped connections are detectable and dropped notifies self-heal. |
| 83 | +6. **Termination:** send the final `run`/terminal frame, then close normally. |
| 84 | + For an already-terminal run, replay steps (honoring `since`) then close. |
58 | 85 |
|
59 | 86 | ## 4. Client-side requirements (this repo) |
60 | 87 |
|
61 | | -1. `agent run get <id> --watch` connects to the stream and renders frames: |
62 | | - `stdout`/`stderr` as output, `status` as transition lines, `done`/`error` |
63 | | - to finish. Exit 0 on `done` with a successful terminal status, non-zero on |
64 | | - `error` or a failed terminal status. |
65 | | -2. **Reconnect with backoff** and resume from the last seen `seq` via |
66 | | - `after_seq`, so a dropped socket doesn't lose or duplicate output. |
67 | | -3. **Fallback:** if the WebSocket can't connect (e.g. server without streaming, |
68 | | - or a `1003`/unsupported close), fall back to the existing REST polling |
69 | | - `watchRun()` automatically, with a one-line notice. `--watch` must keep |
70 | | - working against a backend that lacks the endpoint. |
71 | | -4. **Heartbeat:** respond to/expect pings; treat a missed heartbeat as a dropped |
72 | | - connection and reconnect. |
73 | | -5. `--json` with `--watch`: emit one JSON object per frame (NDJSON) for piping. |
74 | | -6. Fix `ws.ts` error handling to surface a readable message, not |
75 | | - `[object ErrorEvent]`. |
76 | | - |
77 | | -## 5. WebSocket close codes (suggested) |
| 88 | +1. `agent run get <id> --watch` opens the `/v1` stream and renders frames: |
| 89 | + `snapshot`/`steps_append` → render each `CCStep` (assistant text + tool calls, |
| 90 | + tool stdout/stderr, thinking if `--verbose`); `run` → status transitions; |
| 91 | + terminal close → final summary. Exit 0 on a successful terminal status, |
| 92 | + non-zero on error/failed terminal status. |
| 93 | +2. **Reconnect with backoff**, resuming from the highest `step_index` seen via |
| 94 | + `?since=`, so a dropped socket loses/duplicates nothing. |
| 95 | +3. **Fallback:** if the WS can't connect (server without the endpoint, or a |
| 96 | + `1003`/unsupported close), fall back to the existing REST status-poll |
| 97 | + `watchRun()` with a one-line notice. `--watch` must keep working against a |
| 98 | + backend that lacks streaming. |
| 99 | +4. **`--json` with `--watch`:** emit one JSON object per frame (NDJSON). |
| 100 | +5. Reuse/replace the `src/lib/ws.ts` + `src/ui/RunView.tsx` scaffolding. Fix |
| 101 | + `ws.ts` error handling to surface a readable message, not `[object ErrorEvent]` |
| 102 | + (the original bug came from connecting to a then-nonexistent endpoint). |
| 103 | +6. Define a TS mirror of `CCStep` (or a pragmatic subset) in `src/lib/types.ts`, |
| 104 | + matching the backend union. |
| 105 | + |
| 106 | +## 5. WebSocket close codes |
| 107 | + |
| 108 | +Mirror the backend's existing codes where possible: |
78 | 109 |
|
79 | 110 | | Code | Meaning | |
80 | 111 | |------|---------| |
81 | 112 | | 1000 | normal — run reached a terminal state | |
82 | | -| 1008 | auth failed / not authorized for this run | |
83 | | -| 1003 | streaming unsupported (client should fall back to polling) | |
| 113 | +| 4401 | auth failed (bad/expired credential) | |
| 114 | +| 4403 | authenticated but not authorized for this run | |
| 115 | +| 1003 | streaming unsupported (client falls back to polling) | |
84 | 116 | | 1011 | server error | |
85 | 117 |
|
86 | 118 | ## 6. Acceptance criteria |
87 | 119 |
|
88 | | -- Streaming a live run shows stdout/stderr in near real time end to end. |
89 | | -- Killing the socket mid-run and reconnecting resumes with no lost or duplicated |
90 | | - frames (verified via `seq`/`after_seq`). |
91 | | -- `--watch` against a backend without the endpoint transparently falls back to |
92 | | - REST polling and still completes. |
| 120 | +- Streaming a live run shows assistant output + tool stdout/stderr in near real |
| 121 | + time, sharing the `AgentStep`/`CCStep` schema with the dashboard. |
| 122 | +- Killing the socket mid-run and reconnecting with `?since=<last step_index>` |
| 123 | + resumes with no lost or duplicated steps and without a full re-snapshot. |
| 124 | +- `--watch` against a backend without the `/v1` endpoint transparently falls back |
| 125 | + to REST status-polling and still completes. |
93 | 126 | - `--json --watch` emits valid NDJSON, one frame per line. |
94 | | -- Unit tests for the client frame handler, reconnect/resume cursor, and fallback |
95 | | - trigger (mirror the fake-timer style in `test/auth.test.ts` / `test/run.test.ts`). |
| 127 | +- Unit tests for the client frame handler, the `since` resume cursor, and the |
| 128 | + fallback trigger (mirror the fake-timer style in `test/auth.test.ts` / |
| 129 | + `test/run.test.ts`). |
96 | 130 | - No `[object ErrorEvent]`; connection errors print a real message. |
97 | 131 |
|
98 | 132 | ## 7. Out of scope |
99 | 133 |
|
100 | | -- Bidirectional control over the stream (stop/input). `run stop` is tracked |
101 | | - separately and also has no `/v1` endpoint yet. |
| 134 | +- Bidirectional control (stop/input). `run stop` is tracked separately and also |
| 135 | + has no `/v1` endpoint yet. |
| 136 | +- Re-architecting the transport (NOTIFY + DB-as-source-of-truth stays). |
102 | 137 | - Multiplexing multiple runs over one socket. |
| 138 | + |
| 139 | +## 8. Key backend references |
| 140 | + |
| 141 | +- `src/public_api/routers/agents/agents_router.py` — WS `:1200`/`_stream_run_loop :1147`, REST steps `:1073`, stream-ticket `:1090`. |
| 142 | +- `src/public_api/realtime/agent_events.py`, `agent_event_bus.py`, `postgres_listen_bus.py`, `stream_tickets.py`. |
| 143 | +- `src/agents/engine/claude/run_in_sandbox.py:221-257`, `shared.py:45-94`. |
| 144 | +- `src/models/agents/agent_step.py`, `src/models/agents/claude/cc_step.py`, `src/models/database/agents/agent_steps_row.py`. |
0 commit comments