From 909d6c71ecc536d25fa0698563a0372a1841726a Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 12:12:03 +0200 Subject: [PATCH 01/12] fix: replay queued cloud messages via resume on terminal status MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a follow-up was queued during the final turn of a cloud run, the terminal-status branch in handleCloudTaskUpdate silently dropped it — the auto-flush above had also raced ahead with sendCloudPrompt against a now-finished run, so the user_message command never reached the cloud. Skip the auto-flush when this update brings the run terminal, and have the terminal-status branch dequeue any pending messages and replay them via resumeCloudRun (which spins up a fresh task run carrying the prompt as pending_user_message). Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../features/sessions/service/service.test.ts | 63 +++++++++++++++++++ .../features/sessions/service/service.ts | 37 ++++++++--- 2 files changed, 93 insertions(+), 7 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 83fbe1c404..6ee873a2e5 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -70,6 +70,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({ appendOptimisticItem: vi.fn(), clearOptimisticItems: vi.fn(), replaceOptimisticWithEvent: vi.fn(), + updateCloudStatus: vi.fn(), })); const mockGetConfigOptionByCategory = vi.hoisted(() => @@ -915,6 +916,68 @@ describe("SessionService", () => { }); }); + describe("handleCloudTaskUpdate terminal-status routing", () => { + it("dequeues queued messages instead of clearing them when the run reaches a terminal status", async () => { + const service = getSessionService(); + let capturedOnData: + | ((payload: Record) => void) + | undefined; + mockTrpcCloudTask.onUpdate.subscribe.mockImplementation( + ( + _input: unknown, + opts: { onData: (p: Record) => void }, + ) => { + capturedOnData = opts.onData; + return { unsubscribe: vi.fn() }; + }, + ); + + const queuedMessage = { + id: "queue-1", + content: "gimme a joke", + queuedAt: Date.now(), + }; + const session = createMockSession({ + isCloud: true, + cloudStatus: "in_progress", + cloudBranch: "main", + messageQueue: [queuedMessage], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(session); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": session, + }); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([ + queuedMessage, + ] as never); + + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + + expect(capturedOnData).toBeDefined(); + capturedOnData?.({ + kind: "status", + taskId: "task-123", + status: "completed", + stage: undefined, + output: undefined, + errorMessage: undefined, + branch: "main", + }); + + // Queue is drained via dequeueMessages (so resumeCloudRun can replay it), + // not silently dropped via clearMessageQueue. + expect(mockSessionStoreSetters.dequeueMessages).toHaveBeenCalledWith( + "task-123", + ); + expect(mockSessionStoreSetters.clearMessageQueue).not.toHaveBeenCalled(); + }); + }); + describe("reset", () => { it("clears connecting tasks", () => { const service = getSessionService(); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 5262ca607d..791ddf6910 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -2924,9 +2924,17 @@ export class SessionService { } } - // Flush queued messages when a cloud turn completes (detected via live log updates) + const isTerminalUpdate = + (update.kind === "status" || update.kind === "snapshot") && + isTerminalStatus(update.status); + + // Flush queued messages when a cloud turn completes mid-run. Skip when + // the same update brings the run to a terminal status: queued commands + // cannot be delivered to a finished run, so the terminal-status block + // below replays them via resumeCloudRun instead. const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; if ( + !isTerminalUpdate && sessionAfterLogs && !sessionAfterLogs.isPromptPending && sessionAfterLogs.messageQueue.length > 0 @@ -2968,13 +2976,28 @@ export class SessionService { } if (isTerminalStatus(update.status)) { - // Clean up any pending resume messages that couldn't be sent const session = sessionStoreSetters.getSessions()[taskRunId]; - if ( - session && - (session.messageQueue.length > 0 || session.isPromptPending) - ) { - sessionStoreSetters.clearMessageQueue(session.taskId); + // A user message queued during the final turn cannot be delivered to + // a finished run. Replay it through resumeCloudRun, which spins up a + // fresh task run carrying the prompt as `pending_user_message`. + if (session && session.messageQueue.length > 0) { + const queued = sessionStoreSetters.dequeueMessages(session.taskId); + const combinedPrompt = combineQueuedCloudPrompts(queued); + sessionStoreSetters.updateSession(taskRunId, { + isPromptPending: false, + }); + if (combinedPrompt) { + this.resumeCloudRun(session, combinedPrompt).catch((err) => { + log.error("Failed to resume cloud run with queued messages", { + taskId: session.taskId, + error: err, + }); + toast.error( + "Failed to send follow-up message. Please try again.", + ); + }); + } + } else if (session?.isPromptPending) { sessionStoreSetters.updateSession(taskRunId, { isPromptPending: false, }); From 8ff72733234b61e98eb7f5fd63b2b2cdb95d80dc Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 12:22:34 +0200 Subject: [PATCH 02/12] fix: keep optimistic user bubble through the cloud log echo After the queue auto-flush dequeues a message, sendCloudPrompt fires the user_message command but the cloud log stream takes a moment to echo back the session/prompt event. Without an optimistic placeholder, the bubble vanishes during that window and the user thinks the message was dropped. Mirror the local optimistic flow: append a user_message item before the mutate, drop it when the echo arrives, and clear on send failure or retry exhaustion. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../features/sessions/service/service.ts | 63 +++++++++++++++++-- 1 file changed, 59 insertions(+), 4 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 791ddf6910..206d0476ad 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -92,6 +92,13 @@ const LOCAL_SESSION_RECOVERY_MESSAGE = const LOCAL_SESSION_RECOVERY_FAILED_MESSAGE = "Connecting to to the agent has been lost. Retry, or start a new session."; +function isUserPromptEcho(acpMsg: AcpMessage): boolean { + return ( + isJsonRpcRequest(acpMsg.message) && + acpMsg.message.method === "session/prompt" + ); +} + /** * Build default configOptions for cloud sessions so the mode switcher * is available in the UI even without a local agent connection. @@ -1518,6 +1525,17 @@ export class SessionService { isPromptPending: true, }); + // Show the bubble immediately while we wait for the cloud log stream to + // echo the user_message back. Without this the user sees a gap between + // submit (or queue drain) and the agent's response. + if (!options?.skipQueueGuard) { + sessionStoreSetters.appendOptimisticItem(session.taskRunId, { + type: "user_message", + content: transport.promptText, + timestamp: Date.now(), + }); + } + track(ANALYTICS_EVENTS.PROMPT_SENT, { task_id: session.taskId, is_initial: session.events.length === 0, @@ -1565,6 +1583,12 @@ export class SessionService { sessionStoreSetters.updateSession(session.taskRunId, { isPromptPending: false, }); + // Drop optimistic items so a failed send doesn't leave a ghost bubble. + // The combined-prompt path (skipQueueGuard) clears its own optimistic + // items in sendQueuedCloudMessages on retry exhaustion. + if (!options?.skipQueueGuard) { + sessionStoreSetters.clearOptimisticItems(session.taskRunId); + } throw error; } } @@ -1574,10 +1598,29 @@ export class SessionService { attempt = 0, pendingPrompt?: string | ContentBlock[], ): Promise<{ stopReason: string }> { - // First attempt: atomically dequeue. Retries reuse the already-dequeued prompt. - const combinedPrompt = - pendingPrompt ?? - combineQueuedCloudPrompts(sessionStoreSetters.dequeueMessages(taskId)); + // First attempt: atomically dequeue and convert each entry into an + // optimistic bubble. Retries reuse the already-dequeued prompt and must + // not stack additional bubbles. + let combinedPrompt: string | ContentBlock[] | null; + if (pendingPrompt) { + combinedPrompt = pendingPrompt; + } else { + const dequeued = sessionStoreSetters.dequeueMessages(taskId); + combinedPrompt = combineQueuedCloudPrompts(dequeued); + if (combinedPrompt) { + const taskRunId = + sessionStoreSetters.getSessionByTaskId(taskId)?.taskRunId; + if (taskRunId) { + for (const msg of dequeued) { + sessionStoreSetters.appendOptimisticItem(taskRunId, { + type: "user_message", + content: msg.content, + timestamp: msg.queuedAt, + }); + } + } + } + } if (!combinedPrompt) return { stopReason: "skipped" }; const session = sessionStoreSetters.getSessionByTaskId(taskId); @@ -1632,6 +1675,10 @@ export class SessionService { taskId, attempts: attempt + 1, }); + const failedSession = sessionStoreSetters.getSessionByTaskId(taskId); + if (failedSession) { + sessionStoreSetters.clearOptimisticItems(failedSession.taskRunId); + } toast.error("Failed to send follow-up message. Please try again."); return { stopReason: "error" }; } @@ -2901,6 +2948,9 @@ export class SessionService { ); sessionStoreSetters.appendEvents(taskRunId, newEvents, expectedCount); this.updatePromptStateFromEvents(taskRunId, newEvents); + if (newEvents.some(isUserPromptEcho)) { + sessionStoreSetters.clearOptimisticItems(taskRunId); + } } else { // Gap in data — append everything we have but don't jump processedLineCount log.warn("Cloud task log count inconsistency", { @@ -2921,6 +2971,9 @@ export class SessionService { currentCount + update.newEntries.length, ); this.updatePromptStateFromEvents(taskRunId, newEvents); + if (newEvents.some(isUserPromptEcho)) { + sessionStoreSetters.clearOptimisticItems(taskRunId); + } } } @@ -2983,6 +3036,7 @@ export class SessionService { if (session && session.messageQueue.length > 0) { const queued = sessionStoreSetters.dequeueMessages(session.taskId); const combinedPrompt = combineQueuedCloudPrompts(queued); + sessionStoreSetters.clearOptimisticItems(taskRunId); sessionStoreSetters.updateSession(taskRunId, { isPromptPending: false, }); @@ -2998,6 +3052,7 @@ export class SessionService { }); } } else if (session?.isPromptPending) { + sessionStoreSetters.clearOptimisticItems(taskRunId); sessionStoreSetters.updateSession(taskRunId, { isPromptPending: false, }); From 04748cd9f5b53199ecce87fd0f52500d2c53a888 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 12:41:44 +0200 Subject: [PATCH 03/12] fix: replay queued cloud message via resume when run terminates mid-flush MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two race conditions could still drop the message after the previous fix: 1. The auto-flush mutate fires while the cloud is in_progress, but the run terminates before the cloud handles the user_message — the cloud rejects the command and sendQueuedCloudMessages eats the message in retry exhaustion. Catch the failure in sendCloudPrompt and, if the cloud has gone terminal in the meantime, replay through resumeCloudRun. 2. The mutate succeeds but the cloud terminates before echoing back the session/prompt request — the optimistic bubble hangs, the message is silently lost on the cloud side. When the terminal-status block runs with no queue but a pending optimistic user_message, reconstruct the prompt from the optimistic items and replay through resumeCloudRun. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../features/sessions/service/service.ts | 62 ++++++++++++++----- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 206d0476ad..a63c17d944 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -99,6 +99,16 @@ function isUserPromptEcho(acpMsg: AcpMessage): boolean { ); } +function extractOptimisticUserMessage(session: AgentSession): string | null { + const parts: string[] = []; + for (const item of session.optimisticItems) { + if (item.type === "user_message" && item.content.length > 0) { + parts.push(item.content); + } + } + return parts.length > 0 ? parts.join("\n\n") : null; +} + /** * Build default configOptions for cloud sessions so the mode switcher * is available in the UI even without a local agent connection. @@ -1583,6 +1593,19 @@ export class SessionService { sessionStoreSetters.updateSession(session.taskRunId, { isPromptPending: false, }); + // If the run terminated while our user_message was in flight, the + // cloud rejects the command. Re-read the session: if it has gone + // terminal, replay the prompt through resumeCloudRun so the user's + // message still lands instead of being eaten by retry exhaustion. + const fresh = sessionStoreSetters.getSessionByTaskId(session.taskId); + if (fresh && isTerminalStatus(fresh.cloudStatus)) { + log.warn("Cloud user_message rejected after run terminated; resuming", { + taskId: session.taskId, + error: String(error), + }); + sessionStoreSetters.clearOptimisticItems(session.taskRunId); + return this.resumeCloudRun(fresh, prompt); + } // Drop optimistic items so a failed send doesn't leave a ghost bubble. // The combined-prompt path (skipQueueGuard) clears its own optimistic // items in sendQueuedCloudMessages on retry exhaustion. @@ -3030,27 +3053,34 @@ export class SessionService { if (isTerminalStatus(update.status)) { const session = sessionStoreSetters.getSessions()[taskRunId]; - // A user message queued during the final turn cannot be delivered to - // a finished run. Replay it through resumeCloudRun, which spins up a - // fresh task run carrying the prompt as `pending_user_message`. - if (session && session.messageQueue.length > 0) { - const queued = sessionStoreSetters.dequeueMessages(session.taskId); - const combinedPrompt = combineQueuedCloudPrompts(queued); + // A user message that was queued during the final turn — or whose + // user_message command was sent but never echoed back before the run + // terminated — cannot be delivered to a finished run. Replay through + // resumeCloudRun, which spins up a fresh task run carrying the prompt + // as `pending_user_message`. + const queuedPrompt = + session && session.messageQueue.length > 0 + ? combineQueuedCloudPrompts( + sessionStoreSetters.dequeueMessages(session.taskId), + ) + : null; + const optimisticPrompt = + !queuedPrompt && session + ? extractOptimisticUserMessage(session) + : null; + const replayPrompt = queuedPrompt ?? optimisticPrompt; + if (session && replayPrompt) { sessionStoreSetters.clearOptimisticItems(taskRunId); sessionStoreSetters.updateSession(taskRunId, { isPromptPending: false, }); - if (combinedPrompt) { - this.resumeCloudRun(session, combinedPrompt).catch((err) => { - log.error("Failed to resume cloud run with queued messages", { - taskId: session.taskId, - error: err, - }); - toast.error( - "Failed to send follow-up message. Please try again.", - ); + this.resumeCloudRun(session, replayPrompt).catch((err) => { + log.error("Failed to resume cloud run with queued messages", { + taskId: session.taskId, + error: err, }); - } + toast.error("Failed to send follow-up message. Please try again."); + }); } else if (session?.isPromptPending) { sessionStoreSetters.clearOptimisticItems(taskRunId); sessionStoreSetters.updateSession(taskRunId, { From a1b87b85ab7cdeb55667fefb0aa0d1ec7dda6623 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 13:02:57 +0200 Subject: [PATCH 04/12] refactor: send cloud user_message immediately, drop the local in-progress queue MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cloud's /command/ endpoint accepts user_message commands at any time once the run is in_progress and queues them server-side until the agent turn ends — same shape as ACP's session/prompt queueing in the local agent. The renderer was mirroring that queueing locally, which kept introducing race conditions: messages got lost when the run terminated between dequeue and mutate, when the mutate succeeded but the cloud silently terminated before processing, or anywhere in between. Send straight to cloud as soon as the user submits. Show the message in chat right away as an optimistic bubble; if a prior turn is still in flight, mark `isQueued: true` so the bubble renders with the same "queued" affordance the user is used to. The real session/prompt event arrives via the log stream and replaces the optimistic item. The local messageQueue now exists only for the sandbox-not-ready window (cloudStatus !== "in_progress"); handleCloudTaskUpdate flushes it through the regular send path once the run goes in_progress, and the terminal-status branch falls back to resumeCloudRun if anything is left local when the run finishes. Removes ~150 lines of auto-flush, retry, and replay machinery. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../sessions/components/ConversationView.tsx | 15 ++ .../components/buildConversationItems.ts | 1 + .../features/sessions/service/service.test.ts | 32 ++- .../features/sessions/service/service.ts | 217 ++++-------------- .../features/sessions/stores/sessionStore.ts | 4 + 5 files changed, 79 insertions(+), 190 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx index 301523675b..6902f72ae3 100644 --- a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx +++ b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx @@ -154,6 +154,21 @@ export function ConversationView({ (item: ConversationItem) => { switch (item.type) { case "user_message": + if (item.isQueued) { + // Optimistic bubble for a cloud user_message that was sent while + // a prior agent turn was still in flight. The message is already + // on the server; this just shows the queued affordance until the + // cloud's session/prompt echo replaces the optimistic item. + return ( + + ); + } return ( { ); }); - it("preserves cloud attachment prompts when queueing a follow-up", async () => { + it("sends cloud follow-up straight through and marks the optimistic bubble as queued when a prior turn is in flight", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( createMockSession({ @@ -1122,24 +1122,22 @@ describe("SessionService", () => { isPromptPending: true, }), ); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: true, + result: { stopReason: "end_turn" }, + }); - const prompt: ContentBlock[] = [ - { type: "text", text: "read this" }, - { - type: "resource_link", - uri: "file:///tmp/test.txt", - name: "test.txt", - mimeType: "text/plain", - }, - ]; - - const result = await service.sendPrompt("task-123", prompt); + await service.sendPrompt("task-123", "Hello cloud"); - expect(result.stopReason).toBe("queued"); - expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( - "task-123", - "read this\n\nAttached files: test.txt", - prompt, + expect(mockSessionStoreSetters.enqueueMessage).not.toHaveBeenCalled(); + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1); + expect(mockSessionStoreSetters.appendOptimisticItem).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ + type: "user_message", + content: "Hello cloud", + isQueued: true, + }), ); }); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index a63c17d944..a7fd050df7 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1474,7 +1474,6 @@ export class SessionService { private async sendCloudPrompt( session: AgentSession, prompt: string | ContentBlock[], - options?: { skipQueueGuard?: boolean }, ): Promise<{ stopReason: string }> { const transport = getCloudPromptTransport(prompt); if (!transport.messageText && transport.filePaths.length === 0) { @@ -1486,7 +1485,14 @@ export class SessionService { } if (session.cloudStatus !== "in_progress") { - sessionStoreSetters.enqueueMessage(session.taskId, transport.promptText); + // Sandbox isn't accepting commands yet — keep the message in a local + // queue and let the in_progress handler in handleCloudTaskUpdate flush + // it once the run is ready. + sessionStoreSetters.enqueueMessage( + session.taskId, + transport.promptText, + prompt, + ); sessionStoreSetters.updateSession(session.taskRunId, { isPromptPending: true, }); @@ -1497,18 +1503,12 @@ export class SessionService { return { stopReason: "queued" }; } - if (!options?.skipQueueGuard && session.isPromptPending) { - sessionStoreSetters.enqueueMessage( - session.taskId, - transport.promptText, - prompt, - ); - log.info("Cloud message queued", { - taskId: session.taskId, - queueLength: session.messageQueue.length + 1, - }); - return { stopReason: "queued" }; - } + // Once the run is in_progress the cloud accepts user_message commands at + // any time and queues them server-side until the current turn ends. Send + // straight through; the optimistic bubble below shows the message in chat + // immediately, with a "queued" affordance while a prior turn is still in + // flight. + const priorTurnInFlight = session.isPromptPending; const [auth, cloudCommandAuth] = await Promise.all([ this.getAuthCredentials(), @@ -1535,16 +1535,12 @@ export class SessionService { isPromptPending: true, }); - // Show the bubble immediately while we wait for the cloud log stream to - // echo the user_message back. Without this the user sees a gap between - // submit (or queue drain) and the agent's response. - if (!options?.skipQueueGuard) { - sessionStoreSetters.appendOptimisticItem(session.taskRunId, { - type: "user_message", - content: transport.promptText, - timestamp: Date.now(), - }); - } + sessionStoreSetters.appendOptimisticItem(session.taskRunId, { + type: "user_message", + content: transport.promptText, + timestamp: Date.now(), + isQueued: priorTurnInFlight, + }); track(ANALYTICS_EVENTS.PROMPT_SENT, { task_id: session.taskId, @@ -1574,20 +1570,6 @@ export class SessionService { const stopReason = (result.result as { stopReason?: string })?.stopReason ?? "end_turn"; - const freshSession = sessionStoreSetters.getSessionByTaskId( - session.taskId, - ); - if (freshSession && freshSession.messageQueue.length > 0) { - setTimeout(() => { - this.sendQueuedCloudMessages(session.taskId).catch((err) => { - log.error("Failed to send queued cloud messages", { - taskId: session.taskId, - error: err, - }); - }); - }, 0); - } - return { stopReason }; } catch (error) { sessionStoreSetters.updateSession(session.taskRunId, { @@ -1596,7 +1578,7 @@ export class SessionService { // If the run terminated while our user_message was in flight, the // cloud rejects the command. Re-read the session: if it has gone // terminal, replay the prompt through resumeCloudRun so the user's - // message still lands instead of being eaten by retry exhaustion. + // message still lands instead of being eaten. const fresh = sessionStoreSetters.getSessionByTaskId(session.taskId); if (fresh && isTerminalStatus(fresh.cloudStatus)) { log.warn("Cloud user_message rejected after run terminated; resuming", { @@ -1606,107 +1588,11 @@ export class SessionService { sessionStoreSetters.clearOptimisticItems(session.taskRunId); return this.resumeCloudRun(fresh, prompt); } - // Drop optimistic items so a failed send doesn't leave a ghost bubble. - // The combined-prompt path (skipQueueGuard) clears its own optimistic - // items in sendQueuedCloudMessages on retry exhaustion. - if (!options?.skipQueueGuard) { - sessionStoreSetters.clearOptimisticItems(session.taskRunId); - } + sessionStoreSetters.clearOptimisticItems(session.taskRunId); throw error; } } - private async sendQueuedCloudMessages( - taskId: string, - attempt = 0, - pendingPrompt?: string | ContentBlock[], - ): Promise<{ stopReason: string }> { - // First attempt: atomically dequeue and convert each entry into an - // optimistic bubble. Retries reuse the already-dequeued prompt and must - // not stack additional bubbles. - let combinedPrompt: string | ContentBlock[] | null; - if (pendingPrompt) { - combinedPrompt = pendingPrompt; - } else { - const dequeued = sessionStoreSetters.dequeueMessages(taskId); - combinedPrompt = combineQueuedCloudPrompts(dequeued); - if (combinedPrompt) { - const taskRunId = - sessionStoreSetters.getSessionByTaskId(taskId)?.taskRunId; - if (taskRunId) { - for (const msg of dequeued) { - sessionStoreSetters.appendOptimisticItem(taskRunId, { - type: "user_message", - content: msg.content, - timestamp: msg.queuedAt, - }); - } - } - } - } - if (!combinedPrompt) return { stopReason: "skipped" }; - - const session = sessionStoreSetters.getSessionByTaskId(taskId); - if (!session) { - log.warn("No session found for queued cloud messages, message lost", { - taskId, - }); - return { stopReason: "no_session" }; - } - - log.info("Sending queued cloud messages", { - taskId, - promptLength: combinedPrompt.length, - attempt, - }); - - try { - return await this.sendCloudPrompt(session, combinedPrompt, { - skipQueueGuard: true, - }); - } catch (error) { - const maxRetries = 5; - if (attempt < maxRetries) { - const delayMs = Math.min(1000 * 2 ** attempt, 10_000); - log.warn("Cloud message send failed, scheduling retry", { - taskId, - attempt, - delayMs, - error: String(error), - }); - return new Promise((resolve) => { - setTimeout(() => { - resolve( - this.sendQueuedCloudMessages( - taskId, - attempt + 1, - combinedPrompt, - ).catch((err) => { - log.error("Queued cloud message retry failed", { - taskId, - attempt: attempt + 1, - error: err, - }); - return { stopReason: "error" }; - }), - ); - }, delayMs); - }); - } - - log.error("Queued cloud message send failed after max retries", { - taskId, - attempts: attempt + 1, - }); - const failedSession = sessionStoreSetters.getSessionByTaskId(taskId); - if (failedSession) { - sessionStoreSetters.clearOptimisticItems(failedSession.taskRunId); - } - toast.error("Failed to send follow-up message. Please try again."); - return { stopReason: "error" }; - } - } - private async resumeCloudRun( session: AgentSession, prompt: string | ContentBlock[], @@ -3000,29 +2886,6 @@ export class SessionService { } } - const isTerminalUpdate = - (update.kind === "status" || update.kind === "snapshot") && - isTerminalStatus(update.status); - - // Flush queued messages when a cloud turn completes mid-run. Skip when - // the same update brings the run to a terminal status: queued commands - // cannot be delivered to a finished run, so the terminal-status block - // below replays them via resumeCloudRun instead. - const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; - if ( - !isTerminalUpdate && - sessionAfterLogs && - !sessionAfterLogs.isPromptPending && - sessionAfterLogs.messageQueue.length > 0 - ) { - this.sendQueuedCloudMessages(sessionAfterLogs.taskId).catch((err) => { - log.error("Failed to send queued cloud messages after turn complete", { - taskId: sessionAfterLogs.taskId, - error: err, - }); - }); - } - // Update cloud status fields if present if (update.kind === "status" || update.kind === "snapshot") { sessionStoreSetters.updateCloudStatus(taskRunId, { @@ -3033,31 +2896,39 @@ export class SessionService { branch: update.branch, }); - // Auto-send queued messages when a resumed run becomes active + // The local messageQueue only ever holds messages submitted before the + // sandbox was ready (cloudStatus !== "in_progress"). Once the run goes + // in_progress, drain them through the regular send path — the cloud + // accepts user_message commands at any time once the sandbox is up. if (update.status === "in_progress") { const session = sessionStoreSetters.getSessions()[taskRunId]; if (session && session.messageQueue.length > 0) { - // Clear the pending flag first — resumeCloudRun sets it as a guard - // while waiting for the run to start. Now that the run is active, - // sendCloudPrompt needs the flag clear to actually send. + const dequeued = sessionStoreSetters.dequeueMessages(session.taskId); + const combinedPrompt = combineQueuedCloudPrompts(dequeued); sessionStoreSetters.updateSession(taskRunId, { isPromptPending: false, }); - this.sendQueuedCloudMessages(session.taskId).catch(() => { - // Retries exhausted — message was re-enqueued by - // sendQueuedCloudMessages, future stream-based completion detection - // will keep trying - }); + if (combinedPrompt) { + this.sendCloudPrompt(session, combinedPrompt).catch((err) => { + log.error("Failed to flush sandbox-ready queue", { + taskId: session.taskId, + error: err, + }); + toast.error( + "Failed to send follow-up message. Please try again.", + ); + }); + } } } if (isTerminalStatus(update.status)) { const session = sessionStoreSetters.getSessions()[taskRunId]; - // A user message that was queued during the final turn — or whose - // user_message command was sent but never echoed back before the run - // terminated — cannot be delivered to a finished run. Replay through - // resumeCloudRun, which spins up a fresh task run carrying the prompt - // as `pending_user_message`. + // Anything still local (sandbox-not-ready queue or an optimistic + // bubble whose user_message command never echoed back) cannot be + // delivered to a finished run. Replay through resumeCloudRun, which + // spins up a fresh task run carrying the prompt as + // `pending_user_message`. const queuedPrompt = session && session.messageQueue.length > 0 ? combineQueuedCloudPrompts( diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 08b7c7f116..49a6a0be9c 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -32,6 +32,10 @@ export type OptimisticItem = id: string; content: string; timestamp: number; + // True when this bubble was added while a prior agent turn was still + // running. The renderer shows it with the queued affordance until the + // cloud's session/prompt echo replaces it. + isQueued?: boolean; } | { type: "skill_button_action"; From 01b867e2f04368c350cc4fbc858356da7218613d Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 13:24:14 +0200 Subject: [PATCH 05/12] fix: keep the queued bubble visible while sending the cloud message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous attempt rendered an optimistic bubble with an isQueued flag to mimic the queued affordance. In practice the bubble was either invisible or vanished too quickly for the user to perceive. Drop the flag and lean on the existing messageQueue / QueuedMessageView instead: when sendCloudPrompt sees a prior turn is in flight, push the message to messageQueue (which renders the familiar queued bubble at the bottom of the chat) AND fire the user_message command to the cloud immediately. When the cloud's session/prompt echo for our message lands in the log stream, handleCloudTaskUpdate pops the oldest queued message — the agent has just started processing it, and the real event takes over the visual. For the idle case (no prior turn), keep the existing optimistic bubble to fill the dequeue→echo gap. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../sessions/components/ConversationView.tsx | 15 ---- .../components/buildConversationItems.ts | 1 - .../features/sessions/service/service.test.ts | 23 +++--- .../features/sessions/service/service.ts | 77 +++++++++++++------ .../features/sessions/stores/sessionStore.ts | 4 - 5 files changed, 67 insertions(+), 53 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx index 6902f72ae3..301523675b 100644 --- a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx +++ b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx @@ -154,21 +154,6 @@ export function ConversationView({ (item: ConversationItem) => { switch (item.type) { case "user_message": - if (item.isQueued) { - // Optimistic bubble for a cloud user_message that was sent while - // a prior agent turn was still in flight. The message is already - // on the server; this just shows the queued affordance until the - // cloud's session/prompt echo replaces the optimistic item. - return ( - - ); - } return ( { ); }); - it("sends cloud follow-up straight through and marks the optimistic bubble as queued when a prior turn is in flight", async () => { + it("sends a cloud follow-up to the cloud immediately and adds a queued bubble when a prior turn is in flight", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( createMockSession({ @@ -1129,16 +1129,19 @@ describe("SessionService", () => { await service.sendPrompt("task-123", "Hello cloud"); - expect(mockSessionStoreSetters.enqueueMessage).not.toHaveBeenCalled(); - expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1); - expect(mockSessionStoreSetters.appendOptimisticItem).toHaveBeenCalledWith( - "run-123", - expect.objectContaining({ - type: "user_message", - content: "Hello cloud", - isQueued: true, - }), + // Queued bubble shown via the local messageQueue. + expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( + "task-123", + "Hello cloud", + "Hello cloud", ); + // No optimistic bubble — the queued bubble owns the visual. + expect( + mockSessionStoreSetters.appendOptimisticItem, + ).not.toHaveBeenCalled(); + // The user_message command goes straight to cloud rather than waiting + // for the prior turn to end. + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1); }); it("sends prompt via tRPC when session is ready", async () => { diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index a7fd050df7..a573d01219 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1503,12 +1503,21 @@ export class SessionService { return { stopReason: "queued" }; } - // Once the run is in_progress the cloud accepts user_message commands at - // any time and queues them server-side until the current turn ends. Send - // straight through; the optimistic bubble below shows the message in chat - // immediately, with a "queued" affordance while a prior turn is still in - // flight. - const priorTurnInFlight = session.isPromptPending; + // The run is in_progress: the cloud accepts user_message commands at any + // time and queues them server-side until the current turn ends. Show the + // bubble immediately — as a queued bubble (via messageQueue) when a prior + // turn is in flight, otherwise as an optimistic regular bubble. Either + // way the mutate goes straight to the cloud below; when the cloud emits + // the session/prompt echo for our message, handleCloudTaskUpdate clears + // the placeholder and the real event takes over. + const showAsQueued = session.isPromptPending; + if (showAsQueued) { + sessionStoreSetters.enqueueMessage( + session.taskId, + transport.promptText, + prompt, + ); + } const [auth, cloudCommandAuth] = await Promise.all([ this.getAuthCredentials(), @@ -1531,16 +1540,16 @@ export class SessionService { params.artifact_ids = artifactIds; } - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: true, - }); - - sessionStoreSetters.appendOptimisticItem(session.taskRunId, { - type: "user_message", - content: transport.promptText, - timestamp: Date.now(), - isQueued: priorTurnInFlight, - }); + if (!showAsQueued) { + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: true, + }); + sessionStoreSetters.appendOptimisticItem(session.taskRunId, { + type: "user_message", + content: transport.promptText, + timestamp: Date.now(), + }); + } track(ANALYTICS_EVENTS.PROMPT_SENT, { task_id: session.taskId, @@ -1559,10 +1568,6 @@ export class SessionService { params, }); - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: false, - }); - if (!result.success) { throw new Error(result.error ?? "Failed to send cloud command"); } @@ -1572,9 +1577,6 @@ export class SessionService { return { stopReason }; } catch (error) { - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: false, - }); // If the run terminated while our user_message was in flight, the // cloud rejects the command. Re-read the session: if it has gone // terminal, replay the prompt through resumeCloudRun so the user's @@ -1586,9 +1588,14 @@ export class SessionService { error: String(error), }); sessionStoreSetters.clearOptimisticItems(session.taskRunId); + // resumeCloudRun replaces the session entirely so any queued bubble + // on the old session is dropped. return this.resumeCloudRun(fresh, prompt); } sessionStoreSetters.clearOptimisticItems(session.taskRunId); + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: false, + }); throw error; } } @@ -2858,7 +2865,19 @@ export class SessionService { sessionStoreSetters.appendEvents(taskRunId, newEvents, expectedCount); this.updatePromptStateFromEvents(taskRunId, newEvents); if (newEvents.some(isUserPromptEcho)) { + // The agent has just started processing a user message. Drop any + // optimistic placeholder, and pop the oldest queued message if we + // were holding one for the visual queued affordance — its real + // session/prompt event has just landed in the events list. sessionStoreSetters.clearOptimisticItems(taskRunId); + const sessionAfterAppend = + sessionStoreSetters.getSessions()[taskRunId]; + if (sessionAfterAppend?.messageQueue.length) { + sessionStoreSetters.removeQueuedMessage( + sessionAfterAppend.taskId, + sessionAfterAppend.messageQueue[0].id, + ); + } } } else { // Gap in data — append everything we have but don't jump processedLineCount @@ -2881,7 +2900,19 @@ export class SessionService { ); this.updatePromptStateFromEvents(taskRunId, newEvents); if (newEvents.some(isUserPromptEcho)) { + // The agent has just started processing a user message. Drop any + // optimistic placeholder, and pop the oldest queued message if we + // were holding one for the visual queued affordance — its real + // session/prompt event has just landed in the events list. sessionStoreSetters.clearOptimisticItems(taskRunId); + const sessionAfterAppend = + sessionStoreSetters.getSessions()[taskRunId]; + if (sessionAfterAppend?.messageQueue.length) { + sessionStoreSetters.removeQueuedMessage( + sessionAfterAppend.taskId, + sessionAfterAppend.messageQueue[0].id, + ); + } } } } diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 49a6a0be9c..08b7c7f116 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -32,10 +32,6 @@ export type OptimisticItem = id: string; content: string; timestamp: number; - // True when this bubble was added while a prior agent turn was still - // running. The renderer shows it with the queued affordance until the - // cloud's session/prompt echo replaces it. - isQueued?: boolean; } | { type: "skill_button_action"; From ac9bfb6af79cdc6b564df09e5aef91c51dd39883 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 14:10:26 +0200 Subject: [PATCH 06/12] fix: hold cloud follow-ups locally and dispatch on end_turn Match the local agent's behavior: when the user submits during a running cloud turn, hold the message in the local messageQueue (queued bubble shown via QueuedMessageView) and only POST the user_message command after the prior turn's end_turn lands in the log stream. handleCloudTaskUpdate's auto-flush picks up the queued message once isPromptPending flips to false, drains the queue, and routes through sendCloudPrompt (which now adds an optimistic bubble for the dispatch gap until the cloud's session/prompt echo arrives). The terminal-status branch still resumes through a fresh run if the cloud goes terminal with anything pending; the catch in sendCloudPrompt still falls back to resumeCloudRun if the user_message mutate fails because the run terminated mid-flight. Both paths protect the user's message from being silently dropped. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../features/sessions/service/service.test.ts | 16 ++-- .../features/sessions/service/service.ts | 92 ++++++++++--------- 2 files changed, 55 insertions(+), 53 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index f64fae17c7..cd2b75f416 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -1113,7 +1113,7 @@ describe("SessionService", () => { ); }); - it("sends a cloud follow-up to the cloud immediately and adds a queued bubble when a prior turn is in flight", async () => { + it("queues a cloud follow-up locally without dispatching while a prior turn is in flight", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( createMockSession({ @@ -1122,26 +1122,22 @@ describe("SessionService", () => { isPromptPending: true, }), ); - mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ - success: true, - result: { stopReason: "end_turn" }, - }); - await service.sendPrompt("task-123", "Hello cloud"); + const result = await service.sendPrompt("task-123", "Hello cloud"); + expect(result.stopReason).toBe("queued"); // Queued bubble shown via the local messageQueue. expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( "task-123", "Hello cloud", "Hello cloud", ); - // No optimistic bubble — the queued bubble owns the visual. + // The dispatch waits for the prior turn's end_turn — handleCloudTaskUpdate + // auto-flushes then. So no mutate yet, no optimistic bubble. + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); expect( mockSessionStoreSetters.appendOptimisticItem, ).not.toHaveBeenCalled(); - // The user_message command goes straight to cloud rather than waiting - // for the prior turn to end. - expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1); }); it("sends prompt via tRPC when session is ready", async () => { diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index a573d01219..6e5a1f41f1 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1503,20 +1503,21 @@ export class SessionService { return { stopReason: "queued" }; } - // The run is in_progress: the cloud accepts user_message commands at any - // time and queues them server-side until the current turn ends. Show the - // bubble immediately — as a queued bubble (via messageQueue) when a prior - // turn is in flight, otherwise as an optimistic regular bubble. Either - // way the mutate goes straight to the cloud below; when the cloud emits - // the session/prompt echo for our message, handleCloudTaskUpdate clears - // the placeholder and the real event takes over. - const showAsQueued = session.isPromptPending; - if (showAsQueued) { + // Mirror the local agent: hold the follow-up locally as a queued bubble + // until the current turn ends. handleCloudTaskUpdate auto-flushes the + // queue once the agent emits end_turn, at which point we re-enter this + // function with isPromptPending=false and dispatch. + if (session.isPromptPending) { sessionStoreSetters.enqueueMessage( session.taskId, transport.promptText, prompt, ); + log.info("Cloud message queued (prior turn in flight)", { + taskId: session.taskId, + queueLength: session.messageQueue.length + 1, + }); + return { stopReason: "queued" }; } const [auth, cloudCommandAuth] = await Promise.all([ @@ -1540,16 +1541,14 @@ export class SessionService { params.artifact_ids = artifactIds; } - if (!showAsQueued) { - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: true, - }); - sessionStoreSetters.appendOptimisticItem(session.taskRunId, { - type: "user_message", - content: transport.promptText, - timestamp: Date.now(), - }); - } + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: true, + }); + sessionStoreSetters.appendOptimisticItem(session.taskRunId, { + type: "user_message", + content: transport.promptText, + timestamp: Date.now(), + }); track(ANALYTICS_EVENTS.PROMPT_SENT, { task_id: session.taskId, @@ -2865,19 +2864,7 @@ export class SessionService { sessionStoreSetters.appendEvents(taskRunId, newEvents, expectedCount); this.updatePromptStateFromEvents(taskRunId, newEvents); if (newEvents.some(isUserPromptEcho)) { - // The agent has just started processing a user message. Drop any - // optimistic placeholder, and pop the oldest queued message if we - // were holding one for the visual queued affordance — its real - // session/prompt event has just landed in the events list. sessionStoreSetters.clearOptimisticItems(taskRunId); - const sessionAfterAppend = - sessionStoreSetters.getSessions()[taskRunId]; - if (sessionAfterAppend?.messageQueue.length) { - sessionStoreSetters.removeQueuedMessage( - sessionAfterAppend.taskId, - sessionAfterAppend.messageQueue[0].id, - ); - } } } else { // Gap in data — append everything we have but don't jump processedLineCount @@ -2900,23 +2887,42 @@ export class SessionService { ); this.updatePromptStateFromEvents(taskRunId, newEvents); if (newEvents.some(isUserPromptEcho)) { - // The agent has just started processing a user message. Drop any - // optimistic placeholder, and pop the oldest queued message if we - // were holding one for the visual queued affordance — its real - // session/prompt event has just landed in the events list. sessionStoreSetters.clearOptimisticItems(taskRunId); - const sessionAfterAppend = - sessionStoreSetters.getSessions()[taskRunId]; - if (sessionAfterAppend?.messageQueue.length) { - sessionStoreSetters.removeQueuedMessage( - sessionAfterAppend.taskId, - sessionAfterAppend.messageQueue[0].id, - ); - } } } } + const isTerminalUpdate = + (update.kind === "status" || update.kind === "snapshot") && + isTerminalStatus(update.status); + + // Mid-run auto-flush: when the agent's turn just ended, dispatch any + // messages the user queued during that turn. Skip if this update is + // bringing the run terminal — the terminal-status block below replays + // through resumeCloudRun instead. + const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; + if ( + !isTerminalUpdate && + sessionAfterLogs && + !sessionAfterLogs.isPromptPending && + sessionAfterLogs.messageQueue.length > 0 && + sessionAfterLogs.cloudStatus === "in_progress" + ) { + const dequeued = sessionStoreSetters.dequeueMessages( + sessionAfterLogs.taskId, + ); + const combinedPrompt = combineQueuedCloudPrompts(dequeued); + if (combinedPrompt) { + this.sendCloudPrompt(sessionAfterLogs, combinedPrompt).catch((err) => { + log.error("Failed to flush queued cloud messages after turn end", { + taskId: sessionAfterLogs.taskId, + error: err, + }); + toast.error("Failed to send follow-up message. Please try again."); + }); + } + } + // Update cloud status fields if present if (update.kind === "status" || update.kind === "snapshot") { sessionStoreSetters.updateCloudStatus(taskRunId, { From f5cbacfd878d40fd2f0f99d93efcb339f9f79b08 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 14:59:29 +0200 Subject: [PATCH 07/12] fix: keep cloud queued bubble visible through the whole prior turn MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The hold-locally-then-dispatch design dropped messages whenever the cloud's lifecycle didn't line up with the renderer's expectations. The immediate-dispatch design got the message through but the queued affordance only flashed for ~1s. Combine the two: enqueue the message locally (queued bubble visible) AND fire the user_message command to the cloud right away. The cloud queues server-side. handleCloudTaskUpdate now pops one entry from messageQueue per completed turn (matched against currentPromptId so nested calls don't cause spurious pops), so the bubble disappears exactly when the cloud picks our message up — same UX as the local agent. Cleanup on failure paths: if the run terminates while the mutate is in flight, fall back to resumeCloudRun (queue is dropped along with the session). For non-terminal failures, remove the queued entry the user saw so it doesn't outlive the failure toast. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../features/sessions/service/service.test.ts | 19 +- .../features/sessions/service/service.ts | 176 ++++++++++-------- .../features/sessions/stores/sessionStore.ts | 3 +- 3 files changed, 113 insertions(+), 85 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index cd2b75f416..a032e4a72e 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -58,7 +58,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({ removeSession: vi.fn(), updateSession: vi.fn(), appendEvents: vi.fn(), - enqueueMessage: vi.fn(), + enqueueMessage: vi.fn(() => "queue-id-default"), removeQueuedMessage: vi.fn(), clearMessageQueue: vi.fn(), dequeueMessagesAsText: vi.fn(() => null), @@ -1113,7 +1113,7 @@ describe("SessionService", () => { ); }); - it("queues a cloud follow-up locally without dispatching while a prior turn is in flight", async () => { + it("queues a cloud follow-up locally and dispatches to the cloud immediately while a prior turn is in flight", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( createMockSession({ @@ -1122,19 +1122,24 @@ describe("SessionService", () => { isPromptPending: true, }), ); + mockSessionStoreSetters.enqueueMessage.mockReturnValue("queue-id-1"); + mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ + success: true, + result: { stopReason: "end_turn" }, + }); - const result = await service.sendPrompt("task-123", "Hello cloud"); + await service.sendPrompt("task-123", "Hello cloud"); - expect(result.stopReason).toBe("queued"); // Queued bubble shown via the local messageQueue. expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( "task-123", "Hello cloud", "Hello cloud", ); - // The dispatch waits for the prior turn's end_turn — handleCloudTaskUpdate - // auto-flushes then. So no mutate yet, no optimistic bubble. - expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); + // Mutate fires immediately; the cloud queues server-side until the + // prior turn ends. No optimistic bubble — the queued bubble owns the + // visual until handleCloudTaskUpdate pops the queue at end_turn. + expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1); expect( mockSessionStoreSetters.appendOptimisticItem, ).not.toHaveBeenCalled(); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 6e5a1f41f1..6200baf3e7 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1503,21 +1503,20 @@ export class SessionService { return { stopReason: "queued" }; } - // Mirror the local agent: hold the follow-up locally as a queued bubble - // until the current turn ends. handleCloudTaskUpdate auto-flushes the - // queue once the agent emits end_turn, at which point we re-enter this - // function with isPromptPending=false and dispatch. - if (session.isPromptPending) { - sessionStoreSetters.enqueueMessage( + // The cloud accepts user_message commands while a turn is in flight and + // queues them server-side. Match the local agent's UX by also keeping + // the message in our messageQueue (queued bubble visible at the bottom) + // until the prior turn's end_turn arrives — handleCloudTaskUpdate pops + // one entry per turn boundary so the bubble disappears exactly when the + // cloud picks our message up. + const priorTurnInFlight = session.isPromptPending; + let queuedMessageId: string | undefined; + if (priorTurnInFlight) { + queuedMessageId = sessionStoreSetters.enqueueMessage( session.taskId, transport.promptText, prompt, ); - log.info("Cloud message queued (prior turn in flight)", { - taskId: session.taskId, - queueLength: session.messageQueue.length + 1, - }); - return { stopReason: "queued" }; } const [auth, cloudCommandAuth] = await Promise.all([ @@ -1525,6 +1524,12 @@ export class SessionService { this.getCloudCommandAuth(), ]); if (!auth || !cloudCommandAuth) { + if (queuedMessageId) { + sessionStoreSetters.removeQueuedMessage( + session.taskId, + queuedMessageId, + ); + } throw new Error("Authentication required for cloud commands"); } const artifactIds = await uploadRunAttachments( @@ -1541,14 +1546,18 @@ export class SessionService { params.artifact_ids = artifactIds; } - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: true, - }); - sessionStoreSetters.appendOptimisticItem(session.taskRunId, { - type: "user_message", - content: transport.promptText, - timestamp: Date.now(), - }); + if (!priorTurnInFlight) { + // Idle agent — the queued bubble would look strange so add an + // optimistic regular bubble that the session/prompt echo replaces. + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: true, + }); + sessionStoreSetters.appendOptimisticItem(session.taskRunId, { + type: "user_message", + content: transport.promptText, + timestamp: Date.now(), + }); + } track(ANALYTICS_EVENTS.PROMPT_SENT, { task_id: session.taskId, @@ -1579,7 +1588,9 @@ export class SessionService { // If the run terminated while our user_message was in flight, the // cloud rejects the command. Re-read the session: if it has gone // terminal, replay the prompt through resumeCloudRun so the user's - // message still lands instead of being eaten. + // message still lands instead of being eaten. resumeCloudRun replaces + // the session entirely so any queued bubble on the old session is + // dropped naturally. const fresh = sessionStoreSetters.getSessionByTaskId(session.taskId); if (fresh && isTerminalStatus(fresh.cloudStatus)) { log.warn("Cloud user_message rejected after run terminated; resuming", { @@ -1587,14 +1598,22 @@ export class SessionService { error: String(error), }); sessionStoreSetters.clearOptimisticItems(session.taskRunId); - // resumeCloudRun replaces the session entirely so any queued bubble - // on the old session is dropped. return this.resumeCloudRun(fresh, prompt); } + // Non-terminal failure: clean up the placeholder we added so a stuck + // bubble doesn't outlive the failure toast. + if (queuedMessageId) { + sessionStoreSetters.removeQueuedMessage( + session.taskId, + queuedMessageId, + ); + } sessionStoreSetters.clearOptimisticItems(session.taskRunId); - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: false, - }); + if (!priorTurnInFlight) { + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: false, + }); + } throw error; } } @@ -2852,74 +2871,77 @@ export class SessionService { if (delta <= 0) { // Already caught up — skip duplicate entries - } else if (delta <= update.newEntries.length) { - // Normal case: append only the tail (last `delta` entries) - const entriesToAppend = update.newEntries.slice(-delta); + } else { + const isGap = delta > update.newEntries.length; + const entriesToAppend = isGap + ? update.newEntries + : update.newEntries.slice(-delta); + const nextProcessedCount = isGap + ? currentCount + update.newEntries.length + : expectedCount; + if (isGap) { + log.warn("Cloud task log count inconsistency", { + taskRunId, + currentCount, + expectedCount, + entriesReceived: update.newEntries.length, + }); + } + let newEvents = convertStoredEntriesToEvents(entriesToAppend); newEvents = this.filterSkippedPromptEvents( taskRunId, session, newEvents, ); - sessionStoreSetters.appendEvents(taskRunId, newEvents, expectedCount); - this.updatePromptStateFromEvents(taskRunId, newEvents); - if (newEvents.some(isUserPromptEcho)) { - sessionStoreSetters.clearOptimisticItems(taskRunId); + + // Walk the batch tracking when turns start (session/prompt request) + // and end (matching response with stopReason). Each completed turn + // implicitly advances one queued message — the cloud has just picked + // it up and its real session/prompt event is in this same batch. + let walkingPromptId = session?.currentPromptId; + let turnsCompleted = 0; + for (const event of newEvents) { + const m = event.message; + if (isJsonRpcRequest(m) && m.method === "session/prompt") { + walkingPromptId = m.id; + continue; + } + if ( + "id" in m && + "result" in m && + typeof m.result === "object" && + m.result !== null && + "stopReason" in m.result && + m.id === walkingPromptId + ) { + turnsCompleted++; + walkingPromptId = undefined; + } } - } else { - // Gap in data — append everything we have but don't jump processedLineCount - log.warn("Cloud task log count inconsistency", { - taskRunId, - currentCount, - expectedCount, - entriesReceived: update.newEntries.length, - }); - let newEvents = convertStoredEntriesToEvents(update.newEntries); - newEvents = this.filterSkippedPromptEvents( - taskRunId, - session, - newEvents, - ); + sessionStoreSetters.appendEvents( taskRunId, newEvents, - currentCount + update.newEntries.length, + nextProcessedCount, ); this.updatePromptStateFromEvents(taskRunId, newEvents); + if (newEvents.some(isUserPromptEcho)) { sessionStoreSetters.clearOptimisticItems(taskRunId); } - } - } - - const isTerminalUpdate = - (update.kind === "status" || update.kind === "snapshot") && - isTerminalStatus(update.status); - // Mid-run auto-flush: when the agent's turn just ended, dispatch any - // messages the user queued during that turn. Skip if this update is - // bringing the run terminal — the terminal-status block below replays - // through resumeCloudRun instead. - const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; - if ( - !isTerminalUpdate && - sessionAfterLogs && - !sessionAfterLogs.isPromptPending && - sessionAfterLogs.messageQueue.length > 0 && - sessionAfterLogs.cloudStatus === "in_progress" - ) { - const dequeued = sessionStoreSetters.dequeueMessages( - sessionAfterLogs.taskId, - ); - const combinedPrompt = combineQueuedCloudPrompts(dequeued); - if (combinedPrompt) { - this.sendCloudPrompt(sessionAfterLogs, combinedPrompt).catch((err) => { - log.error("Failed to flush queued cloud messages after turn end", { - taskId: sessionAfterLogs.taskId, - error: err, - }); - toast.error("Failed to send follow-up message. Please try again."); - }); + if (turnsCompleted > 0) { + const sessionAfter = sessionStoreSetters.getSessions()[taskRunId]; + if (sessionAfter) { + const idsToPop = sessionAfter.messageQueue + .slice(0, turnsCompleted) + .map((m) => m.id); + for (const id of idsToPop) { + sessionStoreSetters.removeQueuedMessage(sessionAfter.taskId, id); + } + } + } } } diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 08b7c7f116..32c4b05cec 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -317,7 +317,7 @@ export const sessionStoreSetters = { taskId: string, content: string, rawPrompt?: string | ContentBlock[], - ) => { + ): string => { const id = `queue-${Date.now()}-${Math.random().toString(36).slice(2, 9)}`; useSessionStore.setState((state) => { const taskRunId = state.taskIdIndex[taskId]; @@ -333,6 +333,7 @@ export const sessionStoreSetters = { }); } }); + return id; }, removeQueuedMessage: (taskId: string, messageId: string) => { From 2c05c882e30d3361fa2b040724acfc13eb0b8455 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 15:14:39 +0200 Subject: [PATCH 08/12] fix: hold cloud follow-ups until prior turn ends, then dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sending user_message during an in-flight cloud turn preempts the prior turn on the cloud and breaks its response. Hold the follow-up locally as a queued bubble (matching the local agent UX) and only fire the user_message mutate after the prior turn's end_turn lands. handleCloudTaskUpdate's auto-flush re-enters sendCloudPrompt with priorTurnInFlight=false once isPromptPending flips, at which point the mutate dispatches into a now-idle agent. Skip the auto-flush if the same update brings the run to a terminal status — the terminal-status block below replays via resumeCloudRun instead. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../features/sessions/service/service.test.ts | 20 ++- .../features/sessions/service/service.ts | 134 +++++++----------- 2 files changed, 61 insertions(+), 93 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index a032e4a72e..26b36454b2 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -1113,7 +1113,7 @@ describe("SessionService", () => { ); }); - it("queues a cloud follow-up locally and dispatches to the cloud immediately while a prior turn is in flight", async () => { + it("queues a cloud follow-up locally without dispatching while a prior turn is in flight", async () => { const service = getSessionService(); mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( createMockSession({ @@ -1122,24 +1122,20 @@ describe("SessionService", () => { isPromptPending: true, }), ); - mockSessionStoreSetters.enqueueMessage.mockReturnValue("queue-id-1"); - mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({ - success: true, - result: { stopReason: "end_turn" }, - }); - await service.sendPrompt("task-123", "Hello cloud"); + const result = await service.sendPrompt("task-123", "Hello cloud"); - // Queued bubble shown via the local messageQueue. + expect(result.stopReason).toBe("queued"); + // Held in the local messageQueue (queued bubble visible). expect(mockSessionStoreSetters.enqueueMessage).toHaveBeenCalledWith( "task-123", "Hello cloud", "Hello cloud", ); - // Mutate fires immediately; the cloud queues server-side until the - // prior turn ends. No optimistic bubble — the queued bubble owns the - // visual until handleCloudTaskUpdate pops the queue at end_turn. - expect(mockTrpcCloudTask.sendCommand.mutate).toHaveBeenCalledTimes(1); + // user_message during a running turn would preempt the prior turn on + // the cloud — handleCloudTaskUpdate dispatches via the auto-flush + // once end_turn lands. + expect(mockTrpcCloudTask.sendCommand.mutate).not.toHaveBeenCalled(); expect( mockSessionStoreSetters.appendOptimisticItem, ).not.toHaveBeenCalled(); diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 6200baf3e7..be6f6c2f22 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1503,20 +1503,23 @@ export class SessionService { return { stopReason: "queued" }; } - // The cloud accepts user_message commands while a turn is in flight and - // queues them server-side. Match the local agent's UX by also keeping - // the message in our messageQueue (queued bubble visible at the bottom) - // until the prior turn's end_turn arrives — handleCloudTaskUpdate pops - // one entry per turn boundary so the bubble disappears exactly when the - // cloud picks our message up. - const priorTurnInFlight = session.isPromptPending; - let queuedMessageId: string | undefined; - if (priorTurnInFlight) { - queuedMessageId = sessionStoreSetters.enqueueMessage( + // user_message commands sent during an in-flight turn preempt the prior + // turn on the cloud, breaking the response that's still being generated. + // Match the local agent: hold the follow-up locally as a queued bubble + // until the current turn's end_turn lands. handleCloudTaskUpdate's + // auto-flush re-enters this function with priorTurnInFlight=false once + // the agent is idle, and only then do we fire the user_message mutate. + if (session.isPromptPending) { + sessionStoreSetters.enqueueMessage( session.taskId, transport.promptText, prompt, ); + log.info("Cloud message queued (prior turn in flight)", { + taskId: session.taskId, + queueLength: session.messageQueue.length + 1, + }); + return { stopReason: "queued" }; } const [auth, cloudCommandAuth] = await Promise.all([ @@ -1524,12 +1527,6 @@ export class SessionService { this.getCloudCommandAuth(), ]); if (!auth || !cloudCommandAuth) { - if (queuedMessageId) { - sessionStoreSetters.removeQueuedMessage( - session.taskId, - queuedMessageId, - ); - } throw new Error("Authentication required for cloud commands"); } const artifactIds = await uploadRunAttachments( @@ -1546,18 +1543,14 @@ export class SessionService { params.artifact_ids = artifactIds; } - if (!priorTurnInFlight) { - // Idle agent — the queued bubble would look strange so add an - // optimistic regular bubble that the session/prompt echo replaces. - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: true, - }); - sessionStoreSetters.appendOptimisticItem(session.taskRunId, { - type: "user_message", - content: transport.promptText, - timestamp: Date.now(), - }); - } + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: true, + }); + sessionStoreSetters.appendOptimisticItem(session.taskRunId, { + type: "user_message", + content: transport.promptText, + timestamp: Date.now(), + }); track(ANALYTICS_EVENTS.PROMPT_SENT, { task_id: session.taskId, @@ -1588,9 +1581,7 @@ export class SessionService { // If the run terminated while our user_message was in flight, the // cloud rejects the command. Re-read the session: if it has gone // terminal, replay the prompt through resumeCloudRun so the user's - // message still lands instead of being eaten. resumeCloudRun replaces - // the session entirely so any queued bubble on the old session is - // dropped naturally. + // message still lands instead of being eaten. const fresh = sessionStoreSetters.getSessionByTaskId(session.taskId); if (fresh && isTerminalStatus(fresh.cloudStatus)) { log.warn("Cloud user_message rejected after run terminated; resuming", { @@ -1600,20 +1591,10 @@ export class SessionService { sessionStoreSetters.clearOptimisticItems(session.taskRunId); return this.resumeCloudRun(fresh, prompt); } - // Non-terminal failure: clean up the placeholder we added so a stuck - // bubble doesn't outlive the failure toast. - if (queuedMessageId) { - sessionStoreSetters.removeQueuedMessage( - session.taskId, - queuedMessageId, - ); - } sessionStoreSetters.clearOptimisticItems(session.taskRunId); - if (!priorTurnInFlight) { - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: false, - }); - } + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: false, + }); throw error; } } @@ -2894,32 +2875,6 @@ export class SessionService { session, newEvents, ); - - // Walk the batch tracking when turns start (session/prompt request) - // and end (matching response with stopReason). Each completed turn - // implicitly advances one queued message — the cloud has just picked - // it up and its real session/prompt event is in this same batch. - let walkingPromptId = session?.currentPromptId; - let turnsCompleted = 0; - for (const event of newEvents) { - const m = event.message; - if (isJsonRpcRequest(m) && m.method === "session/prompt") { - walkingPromptId = m.id; - continue; - } - if ( - "id" in m && - "result" in m && - typeof m.result === "object" && - m.result !== null && - "stopReason" in m.result && - m.id === walkingPromptId - ) { - turnsCompleted++; - walkingPromptId = undefined; - } - } - sessionStoreSetters.appendEvents( taskRunId, newEvents, @@ -2930,18 +2885,35 @@ export class SessionService { if (newEvents.some(isUserPromptEcho)) { sessionStoreSetters.clearOptimisticItems(taskRunId); } + } + } - if (turnsCompleted > 0) { - const sessionAfter = sessionStoreSetters.getSessions()[taskRunId]; - if (sessionAfter) { - const idsToPop = sessionAfter.messageQueue - .slice(0, turnsCompleted) - .map((m) => m.id); - for (const id of idsToPop) { - sessionStoreSetters.removeQueuedMessage(sessionAfter.taskId, id); - } - } - } + const isTerminalUpdate = + (update.kind === "status" || update.kind === "snapshot") && + isTerminalStatus(update.status); + + // Auto-flush queued messages once the agent's turn ends mid-run. Skip + // if this update is bringing the run to a terminal status — the + // terminal-status block below replays through resumeCloudRun instead. + const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; + if ( + !isTerminalUpdate && + sessionAfterLogs && + !sessionAfterLogs.isPromptPending && + sessionAfterLogs.messageQueue.length > 0 + ) { + const dequeued = sessionStoreSetters.dequeueMessages( + sessionAfterLogs.taskId, + ); + const combinedPrompt = combineQueuedCloudPrompts(dequeued); + if (combinedPrompt) { + this.sendCloudPrompt(sessionAfterLogs, combinedPrompt).catch((err) => { + log.error("Failed to flush queued cloud messages after turn end", { + taskId: sessionAfterLogs.taskId, + error: err, + }); + toast.error("Failed to send follow-up message. Please try again."); + }); } } From 91fe4c6b32a0783b8a248ed4505575f43800a6d6 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 15:34:37 +0200 Subject: [PATCH 09/12] fix: dispatch queued cloud follow-ups via resumeCloudRun, not user_message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The auto-flush was firing user_message mutates after the prior turn's end_turn lands. The cloud accepts the command and queues it server-side but the run is already winding down — by the time the agent would pick the message up the run has terminated, and the message is silently dropped. Replace the auto-flush mutate path with resumeCloudRun, which is what the cloud's resume API is actually designed for: a fresh task run inheriting the prior conversation state, with our prompt carried in as `pending_user_message`. The cloud then runs a normal turn against the queued message and emits its session/prompt + assistant content like any other run. Direct user input (no prior turn) keeps using sendCloudPrompt's mutate path; the existing catch-side fallback to resumeCloudRun handles the case where that mutate fails because the run terminated mid-flight. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../renderer/features/sessions/service/service.ts | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index be6f6c2f22..7e83040e75 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -2892,9 +2892,13 @@ export class SessionService { (update.kind === "status" || update.kind === "snapshot") && isTerminalStatus(update.status); - // Auto-flush queued messages once the agent's turn ends mid-run. Skip - // if this update is bringing the run to a terminal status — the - // terminal-status block below replays through resumeCloudRun instead. + // Once the agent's turn ends, dispatch any queued follow-ups by creating + // a fresh task run via resumeCloudRun. Sending user_message commands to + // a winding-down run is unreliable — the cloud may accept and silently + // drop them when the run terminates. Resume always works: the cloud + // loads the prior conversation state and carries our prompt as + // `pending_user_message`. Skip when this update brings the run terminal + // — the terminal-status block below handles that path. const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; if ( !isTerminalUpdate && @@ -2907,8 +2911,8 @@ export class SessionService { ); const combinedPrompt = combineQueuedCloudPrompts(dequeued); if (combinedPrompt) { - this.sendCloudPrompt(sessionAfterLogs, combinedPrompt).catch((err) => { - log.error("Failed to flush queued cloud messages after turn end", { + this.resumeCloudRun(sessionAfterLogs, combinedPrompt).catch((err) => { + log.error("Failed to resume with queued cloud messages", { taskId: sessionAfterLogs.taskId, error: err, }); From 4785f8099e2e952f1ee1d377b3b9e37436bcd117 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 15:41:00 +0200 Subject: [PATCH 10/12] fix: wait for terminal status before resuming with queued cloud message MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The auto-flush was dispatching resumeCloudRun the moment end_turn arrived in the log stream, but the cloud's resume API requires the prior run to actually be in a terminal state. End_turn alone doesn't guarantee that — the run may still be wrapping up server-side, in which case runTaskInCloud rejects the resume, the catch toasts, and the message is lost. Drop the auto-flush. Let the queue sit until the cloud's status update brings the run terminal, at which point the terminal-status block dequeues + resumeCloudRun. By then the prior run is genuinely terminal and the resume succeeds. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../features/sessions/service/service.ts | 37 +++---------------- 1 file changed, 5 insertions(+), 32 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 7e83040e75..b3b3e3d94d 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -2888,38 +2888,11 @@ export class SessionService { } } - const isTerminalUpdate = - (update.kind === "status" || update.kind === "snapshot") && - isTerminalStatus(update.status); - - // Once the agent's turn ends, dispatch any queued follow-ups by creating - // a fresh task run via resumeCloudRun. Sending user_message commands to - // a winding-down run is unreliable — the cloud may accept and silently - // drop them when the run terminates. Resume always works: the cloud - // loads the prior conversation state and carries our prompt as - // `pending_user_message`. Skip when this update brings the run terminal - // — the terminal-status block below handles that path. - const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; - if ( - !isTerminalUpdate && - sessionAfterLogs && - !sessionAfterLogs.isPromptPending && - sessionAfterLogs.messageQueue.length > 0 - ) { - const dequeued = sessionStoreSetters.dequeueMessages( - sessionAfterLogs.taskId, - ); - const combinedPrompt = combineQueuedCloudPrompts(dequeued); - if (combinedPrompt) { - this.resumeCloudRun(sessionAfterLogs, combinedPrompt).catch((err) => { - log.error("Failed to resume with queued cloud messages", { - taskId: sessionAfterLogs.taskId, - error: err, - }); - toast.error("Failed to send follow-up message. Please try again."); - }); - } - } + // Queued cloud follow-ups are dispatched only once the cloud confirms + // the prior run is fully terminal — resumeCloudRun's runTaskInCloud + // requires a terminal previous run, and end_turn alone in the log + // stream doesn't guarantee that yet. The terminal-status block below + // does the dispatch when the status update lands. // Update cloud status fields if present if (update.kind === "status" || update.kind === "snapshot") { From 7bf64c0f61db0a9cd601d84f85e258c3721fe177 Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 15:50:15 +0200 Subject: [PATCH 11/12] fix: auto-flush queued cloud follow-ups, always falling back to resume MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Combine the auto-flush + resume strategies. After end_turn arrives, auto-flush dequeues the queue and calls sendCloudPrompt, which fires the user_message mutate. If the cloud accepts, the agent processes the message normally. If the mutate fails for any reason — terminal status, transitional wind-down, network blip — sendCloudPrompt now unconditionally falls back to resumeCloudRun, which spins up a fresh task run carrying the prompt as `pending_user_message` and inheriting the prior conversation state. This covers both single-shot clouds (mutate rejected, resume kicks in) and multi-turn clouds (mutate accepted, agent processes immediately). The terminal-status block stays as a final safety net for in-flight optimistic items when the run goes terminal mid-mutate. Adds an info log on auto-flush so we can confirm it's firing in DevTools when reproducing. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../features/sessions/service/service.ts | 75 +++++++++++++------ 1 file changed, 54 insertions(+), 21 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index b3b3e3d94d..16f59f279f 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -1578,24 +1578,28 @@ export class SessionService { return { stopReason }; } catch (error) { - // If the run terminated while our user_message was in flight, the - // cloud rejects the command. Re-read the session: if it has gone - // terminal, replay the prompt through resumeCloudRun so the user's - // message still lands instead of being eaten. - const fresh = sessionStoreSetters.getSessionByTaskId(session.taskId); - if (fresh && isTerminalStatus(fresh.cloudStatus)) { - log.warn("Cloud user_message rejected after run terminated; resuming", { - taskId: session.taskId, - error: String(error), + // user_message commands can fail when the cloud's run is in a + // wind-down state — sometimes detectable via cloudStatus, sometimes + // not (the rejection arrives before the status update does). Either + // way the right recovery is resumeCloudRun, which spins up a fresh + // run inheriting the prior conversation state and carrying our + // prompt as `pending_user_message`. + const fresh = + sessionStoreSetters.getSessionByTaskId(session.taskId) ?? session; + log.warn("Cloud user_message failed; falling back to resume", { + taskId: session.taskId, + cloudStatus: fresh.cloudStatus, + error: String(error), + }); + sessionStoreSetters.clearOptimisticItems(session.taskRunId); + try { + return await this.resumeCloudRun(fresh, prompt); + } catch (resumeError) { + sessionStoreSetters.updateSession(session.taskRunId, { + isPromptPending: false, }); - sessionStoreSetters.clearOptimisticItems(session.taskRunId); - return this.resumeCloudRun(fresh, prompt); + throw resumeError; } - sessionStoreSetters.clearOptimisticItems(session.taskRunId); - sessionStoreSetters.updateSession(session.taskRunId, { - isPromptPending: false, - }); - throw error; } } @@ -2888,11 +2892,40 @@ export class SessionService { } } - // Queued cloud follow-ups are dispatched only once the cloud confirms - // the prior run is fully terminal — resumeCloudRun's runTaskInCloud - // requires a terminal previous run, and end_turn alone in the log - // stream doesn't guarantee that yet. The terminal-status block below - // does the dispatch when the status update lands. + const isTerminalUpdate = + (update.kind === "status" || update.kind === "snapshot") && + isTerminalStatus(update.status); + + // Auto-flush queued cloud follow-ups once the agent's turn ends. We + // dispatch via sendCloudPrompt's normal mutate path; if the cloud + // rejects because the run is winding down, sendCloudPrompt's catch + // falls back to resumeCloudRun. Skip when this same update brings the + // run terminal — the terminal-status block below handles that path. + const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; + if ( + !isTerminalUpdate && + sessionAfterLogs && + !sessionAfterLogs.isPromptPending && + sessionAfterLogs.messageQueue.length > 0 + ) { + const dequeued = sessionStoreSetters.dequeueMessages( + sessionAfterLogs.taskId, + ); + const combinedPrompt = combineQueuedCloudPrompts(dequeued); + if (combinedPrompt) { + log.info("Auto-flushing queued cloud messages after turn end", { + taskId: sessionAfterLogs.taskId, + queuedCount: dequeued.length, + }); + this.sendCloudPrompt(sessionAfterLogs, combinedPrompt).catch((err) => { + log.error("Failed to flush queued cloud messages", { + taskId: sessionAfterLogs.taskId, + error: err, + }); + toast.error("Failed to send follow-up message. Please try again."); + }); + } + } // Update cloud status fields if present if (update.kind === "status" || update.kind === "snapshot") { From 828217b7fcddd69f5dff90b15a74648f61bc404c Mon Sep 17 00:00:00 2001 From: Vojta Bartos Date: Mon, 27 Apr 2026 15:57:39 +0200 Subject: [PATCH 12/12] fix: auto-flush via resumeCloudRun directly, skip user_message command MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The user_message sendCommand path is the wrong tool for queued cloud follow-ups. While a turn is in flight it preempts the prior response; after end_turn the cloud either rejects it or silently drops it as the run winds down. Both modes leave the user with a vanished bubble and no agent reply. Skip the mutate entirely. After end_turn, dequeue and call resumeCloudRun, which uses the same /tasks/{id}/run/ endpoint the initial task run uses, with `resume_from_run_id` and `pending_user_message`. The cloud creates a new run that inherits the prior conversation state and starts a normal turn against the queued prompt — exactly how follow-ups are designed to work in this cloud's single-turn-per-run model. If the resume call fails the auto-flush's outer .catch logs and shows a toast so the failure is visible to the user instead of silent. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227 --- .../features/sessions/service/service.ts | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 16f59f279f..175940115b 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -2896,11 +2896,14 @@ export class SessionService { (update.kind === "status" || update.kind === "snapshot") && isTerminalStatus(update.status); - // Auto-flush queued cloud follow-ups once the agent's turn ends. We - // dispatch via sendCloudPrompt's normal mutate path; if the cloud - // rejects because the run is winding down, sendCloudPrompt's catch - // falls back to resumeCloudRun. Skip when this same update brings the - // run terminal — the terminal-status block below handles that path. + // Auto-flush queued cloud follow-ups once the agent's turn ends. The + // `user_message` command path turned out to be unreliable for this + // cloud — accepted during in_progress (preempting the prior turn), + // rejected or silently dropped after end_turn. resumeCloudRun creates + // a fresh task run that inherits the prior conversation state and + // processes the queued prompt as `pending_user_message` — the same + // path the initial run uses. Skip when this update brings the run + // terminal; the terminal-status block below handles that path. const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId]; if ( !isTerminalUpdate && @@ -2913,12 +2916,15 @@ export class SessionService { ); const combinedPrompt = combineQueuedCloudPrompts(dequeued); if (combinedPrompt) { - log.info("Auto-flushing queued cloud messages after turn end", { - taskId: sessionAfterLogs.taskId, - queuedCount: dequeued.length, - }); - this.sendCloudPrompt(sessionAfterLogs, combinedPrompt).catch((err) => { - log.error("Failed to flush queued cloud messages", { + log.info( + "Auto-flushing queued cloud messages via resume after turn end", + { + taskId: sessionAfterLogs.taskId, + queuedCount: dequeued.length, + }, + ); + this.resumeCloudRun(sessionAfterLogs, combinedPrompt).catch((err) => { + log.error("Failed to resume with queued cloud messages", { taskId: sessionAfterLogs.taskId, error: err, });