Skip to content

Commit ffec9ac

Browse files
cteclaude
andauthored
feat(cli): NDJSON stdin protocol, list subcommands, modularize run.ts (#11597)
* feat(cli): add NDJSON stdin protocol, list subcommands, and modularize run.ts Overhaul the stdin prompt stream from raw text lines to a structured NDJSON command protocol (start/message/cancel/ping/shutdown) with requestId correlation, ack/done/error lifecycle events, and queue telemetry. Add list subcommands (commands, modes, models) for programmatic discovery. Extract stdin stream logic from run.ts into stdin-stream.ts and add shared isRecord guard utility. Includes unit tests for all new modules. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix(core): fix Task.ts bug affecting CLI operation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c974443 commit ffec9ac

File tree

14 files changed

+1637
-368
lines changed

14 files changed

+1637
-368
lines changed

apps/cli/scripts/test-stdin-stream.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,16 @@ async function main() {
2727
console.log("[wrapper] Type a message and press Enter to send it.")
2828
console.log("[wrapper] Type /exit to close stdin and let the CLI finish.")
2929

30+
let requestCounter = 0
31+
let hasStartedTask = false
32+
33+
const sendCommand = (payload: Record<string, unknown>) => {
34+
if (child.stdin?.destroyed) {
35+
return
36+
}
37+
child.stdin?.write(JSON.stringify(payload) + "\n")
38+
}
39+
3040
const rl = readline.createInterface({
3141
input: process.stdin,
3242
output: process.stdout,
@@ -36,14 +46,22 @@ async function main() {
3646
rl.on("line", (line) => {
3747
if (line.trim() === "/exit") {
3848
console.log("[wrapper] Closing stdin...")
49+
sendCommand({
50+
command: "shutdown",
51+
requestId: `shutdown-${Date.now()}-${++requestCounter}`,
52+
})
3953
child.stdin?.end()
4054
rl.close()
4155
return
4256
}
4357

44-
if (!child.stdin?.destroyed) {
45-
child.stdin?.write(`${line}\n`)
46-
}
58+
const command = hasStartedTask ? "message" : "start"
59+
sendCommand({
60+
command,
61+
requestId: `${command}-${Date.now()}-${++requestCounter}`,
62+
prompt: line,
63+
})
64+
hasStartedTask = true
4765
})
4866

4967
const onSignal = (signal: NodeJS.Signals) => {
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import { Writable } from "stream"
2+
3+
import { JsonEventEmitter } from "../json-event-emitter.js"
4+
5+
function createMockStdout(): { stdout: NodeJS.WriteStream; lines: () => Record<string, unknown>[] } {
6+
const chunks: string[] = []
7+
8+
const writable = new Writable({
9+
write(chunk, _encoding, callback) {
10+
chunks.push(chunk.toString())
11+
callback()
12+
},
13+
}) as unknown as NodeJS.WriteStream
14+
15+
// Each write is a JSON line terminated by \n
16+
const lines = () =>
17+
chunks
18+
.join("")
19+
.split("\n")
20+
.filter((l) => l.length > 0)
21+
.map((l) => JSON.parse(l) as Record<string, unknown>)
22+
23+
return { stdout: writable, lines }
24+
}
25+
26+
describe("JsonEventEmitter control events", () => {
27+
describe("emitControl", () => {
28+
it("emits an ack event with type control", () => {
29+
const { stdout, lines } = createMockStdout()
30+
const emitter = new JsonEventEmitter({ mode: "stream-json", stdout })
31+
32+
emitter.emitControl({
33+
subtype: "ack",
34+
requestId: "req-1",
35+
command: "start",
36+
content: "starting task",
37+
code: "accepted",
38+
success: true,
39+
})
40+
41+
const output = lines()
42+
expect(output).toHaveLength(1)
43+
expect(output[0]!).toMatchObject({
44+
type: "control",
45+
subtype: "ack",
46+
requestId: "req-1",
47+
command: "start",
48+
content: "starting task",
49+
code: "accepted",
50+
success: true,
51+
})
52+
expect(output[0]!.done).toBeUndefined()
53+
})
54+
55+
it("sets done: true for done events", () => {
56+
const { stdout, lines } = createMockStdout()
57+
const emitter = new JsonEventEmitter({ mode: "stream-json", stdout })
58+
59+
emitter.emitControl({
60+
subtype: "done",
61+
requestId: "req-2",
62+
command: "start",
63+
content: "task completed",
64+
code: "task_completed",
65+
success: true,
66+
})
67+
68+
const output = lines()
69+
expect(output[0]!).toMatchObject({ type: "control", subtype: "done", done: true })
70+
})
71+
72+
it("does not set done for error events", () => {
73+
const { stdout, lines } = createMockStdout()
74+
const emitter = new JsonEventEmitter({ mode: "stream-json", stdout })
75+
76+
emitter.emitControl({
77+
subtype: "error",
78+
requestId: "req-3",
79+
command: "start",
80+
content: "something went wrong",
81+
code: "task_error",
82+
success: false,
83+
})
84+
85+
const output = lines()
86+
expect(output[0]!.done).toBeUndefined()
87+
expect(output[0]!.success).toBe(false)
88+
})
89+
})
90+
91+
describe("requestIdProvider", () => {
92+
it("injects requestId from provider when event has none", () => {
93+
const { stdout, lines } = createMockStdout()
94+
const emitter = new JsonEventEmitter({
95+
mode: "stream-json",
96+
stdout,
97+
requestIdProvider: () => "injected-id",
98+
})
99+
100+
emitter.emitControl({ subtype: "ack", content: "test" })
101+
102+
const output = lines()
103+
expect(output[0]!.requestId).toBe("injected-id")
104+
})
105+
106+
it("keeps explicit requestId when provider also returns one", () => {
107+
const { stdout, lines } = createMockStdout()
108+
const emitter = new JsonEventEmitter({
109+
mode: "stream-json",
110+
stdout,
111+
requestIdProvider: () => "provider-id",
112+
})
113+
114+
emitter.emitControl({ subtype: "ack", requestId: "explicit-id", content: "test" })
115+
116+
const output = lines()
117+
expect(output[0]!.requestId).toBe("explicit-id")
118+
})
119+
120+
it("omits requestId when provider returns undefined and event has none", () => {
121+
const { stdout, lines } = createMockStdout()
122+
const emitter = new JsonEventEmitter({
123+
mode: "stream-json",
124+
stdout,
125+
requestIdProvider: () => undefined,
126+
})
127+
128+
emitter.emitControl({ subtype: "ack", content: "test" })
129+
130+
const output = lines()
131+
expect(output[0]!).not.toHaveProperty("requestId")
132+
})
133+
})
134+
135+
describe("emitInit", () => {
136+
it("emits system init with default schema values", () => {
137+
const { stdout, lines } = createMockStdout()
138+
const emitter = new JsonEventEmitter({ mode: "stream-json", stdout })
139+
140+
// emitInit requires a client — we call emitControl to test init-like fields instead.
141+
// emitInit is called internally by attach(), so we test the init fields via options.
142+
// Instead, directly verify the constructor defaults by emitting a control event
143+
// and checking that the emitter was created with correct defaults.
144+
145+
// We can't call emitInit without a client, but we can verify the options
146+
// were stored correctly by checking what emitControl produces.
147+
emitter.emitControl({ subtype: "ack", content: "test" })
148+
149+
// The control event itself doesn't include schema fields, but at least
150+
// we verify the emitter was constructed successfully with defaults.
151+
const output = lines()
152+
expect(output).toHaveLength(1)
153+
})
154+
155+
it("accepts custom schemaVersion, protocol, and capabilities", () => {
156+
const { stdout } = createMockStdout()
157+
158+
// Should not throw when constructed with custom values
159+
const emitter = new JsonEventEmitter({
160+
mode: "stream-json",
161+
stdout,
162+
schemaVersion: 2,
163+
protocol: "custom-protocol",
164+
capabilities: ["stdin:start", "stdin:message"],
165+
})
166+
167+
expect(emitter).toBeDefined()
168+
})
169+
})
170+
})

apps/cli/src/agent/json-event-emitter.ts

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import type { ClineMessage } from "@roo-code/types"
1818

19-
import type { JsonEvent, JsonEventCost, JsonFinalOutput } from "@/types/json-events.js"
19+
import type { JsonEvent, JsonEventCost, JsonEventQueueItem, JsonFinalOutput } from "@/types/json-events.js"
2020

2121
import type { ExtensionClient } from "./extension-client.js"
2222
import type { AgentStateChangeEvent, TaskCompletedEvent } from "./events.js"
@@ -30,6 +30,14 @@ export interface JsonEventEmitterOptions {
3030
mode: "json" | "stream-json"
3131
/** Output stream (defaults to process.stdout) */
3232
stdout?: NodeJS.WriteStream
33+
/** Optional request id provider for correlating stream events */
34+
requestIdProvider?: () => string | undefined
35+
/** Transport schema version emitted in system:init */
36+
schemaVersion?: number
37+
/** Transport protocol identifier emitted in system:init */
38+
protocol?: string
39+
/** Supported stdin protocol capabilities emitted in system:init */
40+
capabilities?: string[]
3341
}
3442

3543
/**
@@ -89,17 +97,33 @@ export class JsonEventEmitter {
8997
private events: JsonEvent[] = []
9098
private unsubscribers: (() => void)[] = []
9199
private lastCost: JsonEventCost | undefined
100+
private requestIdProvider: () => string | undefined
101+
private schemaVersion: number
102+
private protocol: string
103+
private capabilities: string[]
92104
private seenMessageIds = new Set<number>()
93105
// Track previous content for delta computation
94106
private previousContent = new Map<number, string>()
95107
// Track the completion result content
96108
private completionResultContent: string | undefined
109+
// Track the latest assistant text as a fallback for result.content.
110+
private lastAssistantText: string | undefined
97111
// The first non-partial "say:text" per task is the echoed user prompt.
98112
private expectPromptEchoAsUser = true
99113

100114
constructor(options: JsonEventEmitterOptions) {
101115
this.mode = options.mode
102116
this.stdout = options.stdout ?? process.stdout
117+
this.requestIdProvider = options.requestIdProvider ?? (() => undefined)
118+
this.schemaVersion = options.schemaVersion ?? 1
119+
this.protocol = options.protocol ?? "roo-cli-stream"
120+
this.capabilities = options.capabilities ?? [
121+
"stdin:start",
122+
"stdin:message",
123+
"stdin:cancel",
124+
"stdin:ping",
125+
"stdin:shutdown",
126+
]
103127
}
104128

105129
/**
@@ -120,6 +144,48 @@ export class JsonEventEmitter {
120144
type: "system",
121145
subtype: "init",
122146
content: "Task started",
147+
schemaVersion: this.schemaVersion,
148+
protocol: this.protocol,
149+
capabilities: this.capabilities,
150+
})
151+
}
152+
153+
emitControl(event: {
154+
subtype: "ack" | "done" | "error"
155+
requestId?: string
156+
command?: string
157+
taskId?: string
158+
content?: string
159+
success?: boolean
160+
code?: string
161+
}): void {
162+
this.emitEvent({
163+
type: "control",
164+
subtype: event.subtype,
165+
requestId: event.requestId,
166+
command: event.command,
167+
taskId: event.taskId,
168+
content: event.content,
169+
success: event.success,
170+
code: event.code,
171+
done: event.subtype === "done" ? true : undefined,
172+
})
173+
}
174+
175+
emitQueue(event: {
176+
subtype: "snapshot" | "enqueued" | "dequeued" | "drained" | "updated"
177+
taskId?: string
178+
content?: string
179+
queueDepth: number
180+
queue: JsonEventQueueItem[]
181+
}): void {
182+
this.emitEvent({
183+
type: "queue",
184+
subtype: event.subtype,
185+
taskId: event.taskId,
186+
content: event.content,
187+
queueDepth: event.queueDepth,
188+
queue: event.queue,
123189
})
124190
}
125191

@@ -248,6 +314,9 @@ export class JsonEventEmitter {
248314
}
249315
} else {
250316
this.emitEvent(this.buildTextEvent("assistant", msg.ts, contentToSend, isDone))
317+
if (msg.text) {
318+
this.lastAssistantText = msg.text
319+
}
251320
}
252321
break
253322

@@ -387,7 +456,7 @@ export class JsonEventEmitter {
387456
*/
388457
private handleTaskCompleted(event: TaskCompletedEvent): void {
389458
// Use tracked completion result content, falling back to event message
390-
const resultContent = this.completionResultContent || event.message?.text
459+
const resultContent = this.completionResultContent || event.message?.text || this.lastAssistantText
391460

392461
this.emitEvent({
393462
type: "result",
@@ -421,10 +490,13 @@ export class JsonEventEmitter {
421490
* For json mode: accumulate for final output
422491
*/
423492
private emitEvent(event: JsonEvent): void {
424-
this.events.push(event)
493+
const requestId = event.requestId ?? this.requestIdProvider()
494+
const payload = requestId ? { ...event, requestId } : event
495+
496+
this.events.push(payload)
425497

426498
if (this.mode === "stream-json") {
427-
this.outputLine(event)
499+
this.outputLine(payload)
428500
}
429501
}
430502

@@ -466,6 +538,7 @@ export class JsonEventEmitter {
466538
this.seenMessageIds.clear()
467539
this.previousContent.clear()
468540
this.completionResultContent = undefined
541+
this.lastAssistantText = undefined
469542
this.expectPromptEchoAsUser = true
470543
}
471544
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { parseFormat } from "../list.js"
2+
3+
describe("parseFormat", () => {
4+
it("defaults to json when undefined", () => {
5+
expect(parseFormat(undefined)).toBe("json")
6+
})
7+
8+
it("returns json for 'json'", () => {
9+
expect(parseFormat("json")).toBe("json")
10+
})
11+
12+
it("returns text for 'text'", () => {
13+
expect(parseFormat("text")).toBe("text")
14+
})
15+
16+
it("is case-insensitive", () => {
17+
expect(parseFormat("JSON")).toBe("json")
18+
expect(parseFormat("Text")).toBe("text")
19+
expect(parseFormat("TEXT")).toBe("text")
20+
})
21+
22+
it("throws on invalid format", () => {
23+
expect(() => parseFormat("xml")).toThrow('Invalid format: xml. Must be "json" or "text".')
24+
})
25+
26+
it("throws on empty string", () => {
27+
expect(() => parseFormat("")).toThrow("Invalid format")
28+
})
29+
})

0 commit comments

Comments
 (0)