Skip to content

Commit 0b65c88

Browse files
committed
fix: replay queued cloud messages via resume on terminal status
When a follow-up was queued during the final turn of a cloud run, the terminal-status branch in handleCloudTaskUpdate silently dropped it — the auto-flush above had also raced ahead with sendCloudPrompt against a now-finished run, so the user_message command never reached the cloud. Skip the auto-flush when this update brings the run terminal, and have the terminal-status branch dequeue any pending messages and replay them via resumeCloudRun (which spins up a fresh task run carrying the prompt as pending_user_message). Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227
1 parent fb5ee29 commit 0b65c88

2 files changed

Lines changed: 94 additions & 7 deletions

File tree

apps/code/src/renderer/features/sessions/service/service.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({
7070
appendOptimisticItem: vi.fn(),
7171
clearOptimisticItems: vi.fn(),
7272
replaceOptimisticWithEvent: vi.fn(),
73+
updateCloudStatus: vi.fn(),
7374
}));
7475

7576
const mockGetConfigOptionByCategory = vi.hoisted(() =>
@@ -915,6 +916,68 @@ describe("SessionService", () => {
915916
});
916917
});
917918

919+
describe("handleCloudTaskUpdate terminal-status routing", () => {
920+
it("dequeues queued messages instead of clearing them when the run reaches a terminal status", async () => {
921+
const service = getSessionService();
922+
let capturedOnData:
923+
| ((payload: Record<string, unknown>) => void)
924+
| undefined;
925+
mockTrpcCloudTask.onUpdate.subscribe.mockImplementation(
926+
(
927+
_input: unknown,
928+
opts: { onData: (p: Record<string, unknown>) => void },
929+
) => {
930+
capturedOnData = opts.onData;
931+
return { unsubscribe: vi.fn() };
932+
},
933+
);
934+
935+
const queuedMessage = {
936+
id: "queue-1",
937+
content: "gimme a joke",
938+
queuedAt: Date.now(),
939+
};
940+
const session = createMockSession({
941+
isCloud: true,
942+
cloudStatus: "in_progress",
943+
cloudBranch: "main",
944+
messageQueue: [queuedMessage],
945+
});
946+
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(session);
947+
mockSessionStoreSetters.getSessions.mockReturnValue({
948+
"run-123": session,
949+
});
950+
mockSessionStoreSetters.dequeueMessages.mockReturnValue([
951+
queuedMessage,
952+
] as never);
953+
954+
service.watchCloudTask(
955+
"task-123",
956+
"run-123",
957+
"https://api.anthropic.com",
958+
123,
959+
);
960+
961+
expect(capturedOnData).toBeDefined();
962+
capturedOnData?.({
963+
kind: "status",
964+
taskId: "task-123",
965+
status: "completed",
966+
stage: undefined,
967+
output: undefined,
968+
errorMessage: undefined,
969+
branch: "main",
970+
});
971+
972+
// Queue is drained via dequeueMessages (so resumeCloudRun can replay it),
973+
// not silently dropped via clearMessageQueue.
974+
expect(mockSessionStoreSetters.dequeueMessages).toHaveBeenCalledWith(
975+
"task-123",
976+
);
977+
expect(mockSessionStoreSetters.clearMessageQueue).not.toHaveBeenCalled();
978+
});
979+
});
980+
918981
describe("reset", () => {
919982
it("clears connecting tasks", () => {
920983
const service = getSessionService();

apps/code/src/renderer/features/sessions/service/service.ts

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2982,9 +2982,17 @@ export class SessionService {
29822982
}
29832983
}
29842984

2985-
// Flush queued messages when a cloud turn completes (detected via live log updates)
2985+
const isTerminalUpdate =
2986+
(update.kind === "status" || update.kind === "snapshot") &&
2987+
isTerminalStatus(update.status);
2988+
2989+
// Flush queued messages when a cloud turn completes mid-run. Skip when
2990+
// the same update brings the run to a terminal status: the queued
2991+
// commands cannot be delivered to a finished run, so the terminal-status
2992+
// block below replays them via resumeCloudRun instead.
29862993
const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId];
29872994
if (
2995+
!isTerminalUpdate &&
29882996
sessionAfterLogs &&
29892997
!sessionAfterLogs.isPromptPending &&
29902998
sessionAfterLogs.messageQueue.length > 0
@@ -3026,13 +3034,29 @@ export class SessionService {
30263034
}
30273035

30283036
if (isTerminalStatus(update.status)) {
3029-
// Clean up any pending resume messages that couldn't be sent
30303037
const session = sessionStoreSetters.getSessions()[taskRunId];
3031-
if (
3032-
session &&
3033-
(session.messageQueue.length > 0 || session.isPromptPending)
3034-
) {
3035-
sessionStoreSetters.clearMessageQueue(session.taskId);
3038+
// A user message queued during the final turn cannot be delivered to
3039+
// a finished run. Replay it through resumeCloudRun, which spins up a
3040+
// fresh task run carrying the prompt as `pending_user_message`.
3041+
if (session && session.messageQueue.length > 0) {
3042+
const queued = sessionStoreSetters.dequeueMessages(session.taskId);
3043+
const combinedPrompt = combineQueuedCloudPrompts(queued);
3044+
sessionStoreSetters.clearOptimisticItems(taskRunId);
3045+
sessionStoreSetters.updateSession(taskRunId, {
3046+
isPromptPending: false,
3047+
});
3048+
if (combinedPrompt) {
3049+
this.resumeCloudRun(session, combinedPrompt).catch((err) => {
3050+
log.error("Failed to resume cloud run with queued messages", {
3051+
taskId: session.taskId,
3052+
error: err,
3053+
});
3054+
toast.error(
3055+
"Failed to send follow-up message. Please try again.",
3056+
);
3057+
});
3058+
}
3059+
} else if (session?.isPromptPending) {
30363060
sessionStoreSetters.clearOptimisticItems(taskRunId);
30373061
sessionStoreSetters.updateSession(taskRunId, {
30383062
isPromptPending: false,

0 commit comments

Comments
 (0)