Skip to content

Commit 17d534e

Browse files
authored
In cli stdin stream mode we should not create new tasks (#11515)
1 parent 24958f3 commit 17d534e

File tree

4 files changed

+227
-11
lines changed

4 files changed

+227
-11
lines changed

apps/cli/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
"build:extension": "pnpm --filter roo-cline bundle",
1818
"dev": "ROO_AUTH_BASE_URL=https://app.roocode.com ROO_SDK_BASE_URL=https://cloud-api.roocode.com ROO_CODE_PROVIDER_URL=https://api.roocode.com/proxy tsx src/index.ts",
1919
"dev:local": "ROO_AUTH_BASE_URL=http://localhost:3000 ROO_SDK_BASE_URL=http://localhost:3001 ROO_CODE_PROVIDER_URL=http://localhost:8080/proxy tsx src/index.ts",
20+
"dev:test-stdin": "tsx scripts/test-stdin-stream.ts",
2021
"clean": "rimraf dist .turbo"
2122
},
2223
"dependencies": {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import path from "path"
2+
import { fileURLToPath } from "url"
3+
import readline from "readline"
4+
5+
import { execa } from "execa"
6+
7+
const __dirname = path.dirname(fileURLToPath(import.meta.url))
8+
const cliRoot = path.resolve(__dirname, "..")
9+
10+
async function main() {
11+
const child = execa(
12+
"pnpm",
13+
["dev", "--print", "--stdin-prompt-stream", "--provider", "roo", "--output-format", "stream-json"],
14+
{
15+
cwd: cliRoot,
16+
stdin: "pipe",
17+
stdout: "pipe",
18+
stderr: "pipe",
19+
reject: false,
20+
forceKillAfterDelay: 2_000,
21+
},
22+
)
23+
24+
child.stdout?.on("data", (chunk) => process.stdout.write(chunk))
25+
child.stderr?.on("data", (chunk) => process.stderr.write(chunk))
26+
27+
console.log("[wrapper] Type a message and press Enter to send it.")
28+
console.log("[wrapper] Type /exit to close stdin and let the CLI finish.")
29+
30+
const rl = readline.createInterface({
31+
input: process.stdin,
32+
output: process.stdout,
33+
terminal: true,
34+
})
35+
36+
rl.on("line", (line) => {
37+
if (line.trim() === "/exit") {
38+
console.log("[wrapper] Closing stdin...")
39+
child.stdin?.end()
40+
rl.close()
41+
return
42+
}
43+
44+
if (!child.stdin?.destroyed) {
45+
child.stdin?.write(`${line}\n`)
46+
}
47+
})
48+
49+
const onSignal = (signal: NodeJS.Signals) => {
50+
console.log(`[wrapper] Received ${signal}, forwarding to CLI...`)
51+
rl.close()
52+
child.kill(signal)
53+
}
54+
55+
process.on("SIGINT", () => onSignal("SIGINT"))
56+
process.on("SIGTERM", () => onSignal("SIGTERM"))
57+
58+
const result = await child
59+
rl.close()
60+
console.log(`[wrapper] CLI exited with code ${result.exitCode}`)
61+
process.exit(result.exitCode ?? 1)
62+
}
63+
64+
main().catch((error) => {
65+
console.error("[wrapper] Fatal error:", error)
66+
process.exit(1)
67+
})

apps/cli/src/agent/json-event-emitter.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ import type { ClineMessage } from "@roo-code/types"
1919
import type { JsonEvent, JsonEventCost, JsonFinalOutput } from "@/types/json-events.js"
2020

2121
import type { ExtensionClient } from "./extension-client.js"
22-
import type { TaskCompletedEvent } from "./events.js"
22+
import type { AgentStateChangeEvent, TaskCompletedEvent } from "./events.js"
23+
import { AgentLoopState } from "./agent-state.js"
2324

2425
/**
2526
* Options for JsonEventEmitter.
@@ -108,10 +109,11 @@ export class JsonEventEmitter {
108109
// Subscribe to message events
109110
const unsubMessage = client.on("message", (msg) => this.handleMessage(msg, false))
110111
const unsubMessageUpdated = client.on("messageUpdated", (msg) => this.handleMessage(msg, true))
112+
const unsubStateChange = client.on("stateChange", (event) => this.handleStateChange(event))
111113
const unsubTaskCompleted = client.on("taskCompleted", (event) => this.handleTaskCompleted(event))
112114
const unsubError = client.on("error", (error) => this.handleError(error))
113115

114-
this.unsubscribers.push(unsubMessage, unsubMessageUpdated, unsubTaskCompleted, unsubError)
116+
this.unsubscribers.push(unsubMessage, unsubMessageUpdated, unsubStateChange, unsubTaskCompleted, unsubError)
115117

116118
// Emit init event
117119
this.emitEvent({
@@ -121,6 +123,16 @@ export class JsonEventEmitter {
121123
})
122124
}
123125

126+
private handleStateChange(event: AgentStateChangeEvent): void {
127+
// Only treat the next say:text as a prompt echo when a new task starts.
128+
if (
129+
event.previousState.state === AgentLoopState.NO_TASK &&
130+
event.currentState.state !== AgentLoopState.NO_TASK
131+
) {
132+
this.expectPromptEchoAsUser = true
133+
}
134+
}
135+
124136
/**
125137
* Detach from the client and clean up subscriptions.
126138
*/
@@ -257,6 +269,9 @@ export class JsonEventEmitter {
257269
case "user_feedback":
258270
case "user_feedback_diff":
259271
this.emitEvent(this.buildTextEvent("user", msg.ts, contentToSend, isDone))
272+
if (isDone) {
273+
this.expectPromptEchoAsUser = false
274+
}
260275
break
261276

262277
case "api_req_started": {
@@ -387,9 +402,6 @@ export class JsonEventEmitter {
387402
if (this.mode === "json") {
388403
this.outputFinalResult(event.success, resultContent)
389404
}
390-
391-
// Next task in the same process starts with a new echoed prompt.
392-
this.expectPromptEchoAsUser = true
393405
}
394406

395407
/**

apps/cli/src/commands/cli/run.ts

Lines changed: 142 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { createInterface } from "readline"
44
import { fileURLToPath } from "url"
55

66
import { createElement } from "react"
7+
import pWaitFor from "p-wait-for"
78

89
import { setLogger } from "@roo-code/vscode-shim"
910

@@ -306,15 +307,149 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption
306307

307308
if (useStdinPromptStream) {
308309
let hasReceivedStdinPrompt = false
310+
// stdin stream mode may start at most one task in this process.
311+
let startedTaskFromStdin = false
312+
let activeTaskPromise: Promise<void> | null = null
313+
let fatalStreamError: Error | null = null
314+
// Extension-owned queue depth mirrored from state pushes.
315+
// CLI does not maintain its own prompt queue.
316+
let extensionQueueDepth = 0
317+
318+
const waitForInitialState = async () => {
319+
// Give the extension a brief chance to publish initial state so
320+
// we can continue an existing task instead of creating a new one.
321+
await pWaitFor(
322+
() => {
323+
if (fatalStreamError) {
324+
throw fatalStreamError
325+
}
326+
327+
return host.client.isInitialized()
328+
},
329+
{ interval: 25, timeout: 2_000 },
330+
).catch(() => {
331+
// Best-effort wait only; continuing preserves previous behavior.
332+
})
333+
334+
if (fatalStreamError) {
335+
throw fatalStreamError
336+
}
337+
}
309338

310-
for await (const stdinPrompt of readPromptsFromStdinLines()) {
311-
hasReceivedStdinPrompt = true
312-
await host.runTask(stdinPrompt)
313-
jsonEmitter?.clear()
339+
const waitForActiveTask = async () => {
340+
await pWaitFor(
341+
() => {
342+
if (fatalStreamError) {
343+
throw fatalStreamError
344+
}
345+
346+
if (!host.client.hasActiveTask()) {
347+
if (!activeTaskPromise && startedTaskFromStdin) {
348+
throw new Error("task is no longer active; cannot continue conversation from stdin")
349+
}
350+
351+
return false
352+
}
353+
354+
return true
355+
},
356+
{ interval: 25, timeout: 5_000 },
357+
)
314358
}
315359

316-
if (!hasReceivedStdinPrompt) {
317-
throw new Error("no prompt provided via stdin")
360+
const startInitialTask = async (taskPrompt: string) => {
361+
startedTaskFromStdin = true
362+
363+
activeTaskPromise = host
364+
.runTask(taskPrompt)
365+
.catch((error) => {
366+
fatalStreamError = error instanceof Error ? error : new Error(String(error))
367+
})
368+
.finally(() => {
369+
activeTaskPromise = null
370+
})
371+
372+
await waitForActiveTask()
373+
}
374+
375+
const enqueueContinuation = async (text: string) => {
376+
if (!host.client.hasActiveTask()) {
377+
await waitForActiveTask()
378+
}
379+
380+
// Delegate ordering/drain behavior to the extension message queue.
381+
host.sendToExtension({ type: "queueMessage", text })
382+
}
383+
384+
const offClientError = host.client.on("error", (error) => {
385+
fatalStreamError = error
386+
})
387+
388+
const onExtensionMessage = (message: { type?: string; state?: { messageQueue?: unknown } }) => {
389+
if (message.type !== "state") {
390+
return
391+
}
392+
393+
const messageQueue = message.state?.messageQueue
394+
extensionQueueDepth = Array.isArray(messageQueue) ? messageQueue.length : 0
395+
}
396+
397+
host.on("extensionWebviewMessage", onExtensionMessage)
398+
399+
try {
400+
await waitForInitialState()
401+
402+
for await (const stdinPrompt of readPromptsFromStdinLines()) {
403+
hasReceivedStdinPrompt = true
404+
405+
// Start once, then always continue via extension queue.
406+
if (!host.client.hasActiveTask() && !startedTaskFromStdin) {
407+
await startInitialTask(stdinPrompt)
408+
} else {
409+
await enqueueContinuation(stdinPrompt)
410+
}
411+
412+
if (fatalStreamError) {
413+
throw fatalStreamError
414+
}
415+
}
416+
417+
if (!hasReceivedStdinPrompt) {
418+
throw new Error("no prompt provided via stdin")
419+
}
420+
421+
await pWaitFor(
422+
() => {
423+
if (fatalStreamError) {
424+
throw fatalStreamError
425+
}
426+
427+
const isSettled =
428+
!host.client.hasActiveTask() && !activeTaskPromise && extensionQueueDepth === 0
429+
430+
if (isSettled) {
431+
return true
432+
}
433+
434+
if (host.isWaitingForInput() && extensionQueueDepth === 0) {
435+
const currentAsk = host.client.getCurrentAsk()
436+
437+
if (currentAsk === "completion_result") {
438+
return true
439+
}
440+
441+
if (currentAsk) {
442+
throw new Error(`stdin ended while task was waiting for input (${currentAsk})`)
443+
}
444+
}
445+
446+
return false
447+
},
448+
{ interval: 50 },
449+
)
450+
} finally {
451+
offClientError()
452+
host.off("extensionWebviewMessage", onExtensionMessage)
318453
}
319454
} else {
320455
await host.runTask(prompt!)
@@ -331,6 +466,7 @@ export async function run(promptArg: string | undefined, flagOptions: FlagOption
331466
process.stdout.write(JSON.stringify(errorEvent) + "\n")
332467
} else {
333468
console.error("[CLI] Error:", errorMessage)
469+
334470
if (error instanceof Error) {
335471
console.error(error.stack)
336472
}

0 commit comments

Comments
 (0)