Skip to content

Commit 909d6c7

Browse files
committed
fix: replay queued cloud messages via resume on terminal status
When a follow-up was queued during the final turn of a cloud run, the terminal-status branch in handleCloudTaskUpdate silently dropped it — the auto-flush above had also raced ahead with sendCloudPrompt against a now-finished run, so the user_message command never reached the cloud. Skip the auto-flush when this update brings the run terminal, and have the terminal-status branch dequeue any pending messages and replay them via resumeCloudRun (which spins up a fresh task run carrying the prompt as pending_user_message). Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227
1 parent 09c791f commit 909d6c7

2 files changed

Lines changed: 93 additions & 7 deletions

File tree

apps/code/src/renderer/features/sessions/service/service.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({
7070
appendOptimisticItem: vi.fn(),
7171
clearOptimisticItems: vi.fn(),
7272
replaceOptimisticWithEvent: vi.fn(),
73+
updateCloudStatus: vi.fn(),
7374
}));
7475

7576
const mockGetConfigOptionByCategory = vi.hoisted(() =>
@@ -915,6 +916,68 @@ describe("SessionService", () => {
915916
});
916917
});
917918

919+
describe("handleCloudTaskUpdate terminal-status routing", () => {
920+
it("dequeues queued messages instead of clearing them when the run reaches a terminal status", async () => {
921+
const service = getSessionService();
922+
let capturedOnData:
923+
| ((payload: Record<string, unknown>) => void)
924+
| undefined;
925+
mockTrpcCloudTask.onUpdate.subscribe.mockImplementation(
926+
(
927+
_input: unknown,
928+
opts: { onData: (p: Record<string, unknown>) => void },
929+
) => {
930+
capturedOnData = opts.onData;
931+
return { unsubscribe: vi.fn() };
932+
},
933+
);
934+
935+
const queuedMessage = {
936+
id: "queue-1",
937+
content: "gimme a joke",
938+
queuedAt: Date.now(),
939+
};
940+
const session = createMockSession({
941+
isCloud: true,
942+
cloudStatus: "in_progress",
943+
cloudBranch: "main",
944+
messageQueue: [queuedMessage],
945+
});
946+
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(session);
947+
mockSessionStoreSetters.getSessions.mockReturnValue({
948+
"run-123": session,
949+
});
950+
mockSessionStoreSetters.dequeueMessages.mockReturnValue([
951+
queuedMessage,
952+
] as never);
953+
954+
service.watchCloudTask(
955+
"task-123",
956+
"run-123",
957+
"https://api.anthropic.com",
958+
123,
959+
);
960+
961+
expect(capturedOnData).toBeDefined();
962+
capturedOnData?.({
963+
kind: "status",
964+
taskId: "task-123",
965+
status: "completed",
966+
stage: undefined,
967+
output: undefined,
968+
errorMessage: undefined,
969+
branch: "main",
970+
});
971+
972+
// Queue is drained via dequeueMessages (so resumeCloudRun can replay it),
973+
// not silently dropped via clearMessageQueue.
974+
expect(mockSessionStoreSetters.dequeueMessages).toHaveBeenCalledWith(
975+
"task-123",
976+
);
977+
expect(mockSessionStoreSetters.clearMessageQueue).not.toHaveBeenCalled();
978+
});
979+
});
980+
918981
describe("reset", () => {
919982
it("clears connecting tasks", () => {
920983
const service = getSessionService();

apps/code/src/renderer/features/sessions/service/service.ts

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2924,9 +2924,17 @@ export class SessionService {
29242924
}
29252925
}
29262926

2927-
// Flush queued messages when a cloud turn completes (detected via live log updates)
2927+
const isTerminalUpdate =
2928+
(update.kind === "status" || update.kind === "snapshot") &&
2929+
isTerminalStatus(update.status);
2930+
2931+
// Flush queued messages when a cloud turn completes mid-run. Skip when
2932+
// the same update brings the run to a terminal status: queued commands
2933+
// cannot be delivered to a finished run, so the terminal-status block
2934+
// below replays them via resumeCloudRun instead.
29282935
const sessionAfterLogs = sessionStoreSetters.getSessions()[taskRunId];
29292936
if (
2937+
!isTerminalUpdate &&
29302938
sessionAfterLogs &&
29312939
!sessionAfterLogs.isPromptPending &&
29322940
sessionAfterLogs.messageQueue.length > 0
@@ -2968,13 +2976,28 @@ export class SessionService {
29682976
}
29692977

29702978
if (isTerminalStatus(update.status)) {
2971-
// Clean up any pending resume messages that couldn't be sent
29722979
const session = sessionStoreSetters.getSessions()[taskRunId];
2973-
if (
2974-
session &&
2975-
(session.messageQueue.length > 0 || session.isPromptPending)
2976-
) {
2977-
sessionStoreSetters.clearMessageQueue(session.taskId);
2980+
// A user message queued during the final turn cannot be delivered to
2981+
// a finished run. Replay it through resumeCloudRun, which spins up a
2982+
// fresh task run carrying the prompt as `pending_user_message`.
2983+
if (session && session.messageQueue.length > 0) {
2984+
const queued = sessionStoreSetters.dequeueMessages(session.taskId);
2985+
const combinedPrompt = combineQueuedCloudPrompts(queued);
2986+
sessionStoreSetters.updateSession(taskRunId, {
2987+
isPromptPending: false,
2988+
});
2989+
if (combinedPrompt) {
2990+
this.resumeCloudRun(session, combinedPrompt).catch((err) => {
2991+
log.error("Failed to resume cloud run with queued messages", {
2992+
taskId: session.taskId,
2993+
error: err,
2994+
});
2995+
toast.error(
2996+
"Failed to send follow-up message. Please try again.",
2997+
);
2998+
});
2999+
}
3000+
} else if (session?.isPromptPending) {
29783001
sessionStoreSetters.updateSession(taskRunId, {
29793002
isPromptPending: false,
29803003
});

0 commit comments

Comments
 (0)