Skip to content

Commit 82e07fe

Browse files
authored
fix(sessions): handle turn_complete event ordering for local sessions (#2197)
1 parent 0ab5746 commit 82e07fe

2 files changed

Lines changed: 103 additions & 8 deletions

File tree

apps/code/src/renderer/features/sessions/service/service.test.ts

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ const mockSessionStoreSetters = vi.hoisted(() => ({
7373
enqueueMessage: vi.fn(),
7474
removeQueuedMessage: vi.fn(),
7575
clearMessageQueue: vi.fn(),
76-
dequeueMessagesAsText: vi.fn(() => null),
76+
dequeueMessagesAsText: vi.fn((): string | null => null),
7777
dequeueMessages: vi.fn(
7878
() =>
7979
[] as Array<{
@@ -3027,6 +3027,95 @@ describe("SessionService", () => {
30273027
});
30283028
});
30293029

3030+
describe("local turn_complete + JSON-RPC response ordering", () => {
3031+
it("drains queued messages when turn_complete arrives before the JSON-RPC response (local Codex regression)", async () => {
3032+
const service = getSessionService();
3033+
3034+
let session: AgentSession | undefined;
3035+
mockSessionStoreSetters.getSessionByTaskId.mockImplementation(
3036+
() => session,
3037+
);
3038+
mockSessionStoreSetters.getSessions.mockImplementation(() =>
3039+
session ? { "run-123": session } : {},
3040+
);
3041+
mockSessionStoreSetters.updateSession.mockImplementation(
3042+
(_taskRunId, updates) => {
3043+
if (session) session = { ...session, ...updates };
3044+
},
3045+
);
3046+
mockSessionStoreSetters.setSession.mockImplementation((next) => {
3047+
session = next as AgentSession;
3048+
});
3049+
mockSessionStoreSetters.dequeueMessagesAsText.mockReturnValue(
3050+
"follow up",
3051+
);
3052+
3053+
mockBuildAuthenticatedClient.mockReturnValue({
3054+
...mockAuthenticatedClient,
3055+
createTaskRun: vi.fn().mockResolvedValue({ id: "run-123" }),
3056+
appendTaskRunLog: vi.fn(),
3057+
});
3058+
mockTrpcAgent.start.mutate.mockResolvedValue({
3059+
channel: "agent-event:run-123",
3060+
configOptions: [],
3061+
});
3062+
mockTrpcAgent.prompt.mutate.mockResolvedValue({ stopReason: "end_turn" });
3063+
3064+
await service.connectToTask({
3065+
task: createMockTask(),
3066+
repoPath: "/repo",
3067+
});
3068+
3069+
const onData = mockTrpcAgent.onSessionEvent.subscribe.mock.calls.at(
3070+
-1,
3071+
)?.[1]?.onData as ((payload: unknown) => void) | undefined;
3072+
expect(onData).toBeDefined();
3073+
3074+
const queuedMessage = {
3075+
id: "q-1",
3076+
content: "follow up",
3077+
queuedAt: 1700000000,
3078+
};
3079+
session = createMockSession({
3080+
taskRunId: "run-123",
3081+
taskId: "task-123",
3082+
status: "connected",
3083+
isCloud: false,
3084+
currentPromptId: 42,
3085+
isPromptPending: true,
3086+
messageQueue: [queuedMessage],
3087+
});
3088+
3089+
onData?.({
3090+
type: "acp_message",
3091+
ts: 1700000001,
3092+
message: {
3093+
jsonrpc: "2.0",
3094+
method: "_posthog/turn_complete",
3095+
params: { sessionId: "acp-session", stopReason: "end_turn" },
3096+
},
3097+
});
3098+
3099+
expect(session?.currentPromptId).toBe(42);
3100+
3101+
onData?.({
3102+
type: "acp_message",
3103+
ts: 1700000002,
3104+
message: {
3105+
jsonrpc: "2.0",
3106+
id: 42,
3107+
result: { stopReason: "end_turn" },
3108+
},
3109+
});
3110+
3111+
await vi.waitFor(() => {
3112+
expect(mockTrpcAgent.prompt.mutate).toHaveBeenCalledWith(
3113+
expect.objectContaining({ sessionId: "run-123" }),
3114+
);
3115+
});
3116+
});
3117+
});
3118+
30303119
describe("cancelPrompt", () => {
30313120
it("returns false if no session exists", async () => {
30323121
const service = getSessionService();

apps/code/src/renderer/features/sessions/service/service.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ function buildCloudDefaultConfigOptions(
180180
];
181181
}
182182

183-
function isCloudTurnCompleteEvent(event: AcpMessage): boolean {
183+
function isTurnCompleteEvent(event: AcpMessage): boolean {
184184
const msg = event.message;
185185
return (
186186
"method" in msg &&
@@ -1111,12 +1111,18 @@ export class SessionService {
11111111
currentPromptId: null,
11121112
});
11131113
}
1114-
if (isCloudTurnCompleteEvent(acpMsg)) {
1115-
sessionStoreSetters.updateSession(taskRunId, {
1116-
isPromptPending: false,
1117-
promptStartedAt: null,
1118-
currentPromptId: null,
1119-
});
1114+
if (isTurnCompleteEvent(acpMsg)) {
1115+
// Local sessions use the JSON-RPC response as the canonical turn-done
1116+
// signal; clearing currentPromptId here would race the id-match guard
1117+
// above. Cloud sessions never see that response.
1118+
const session = this.getSessionByRunId(taskRunId);
1119+
if (session?.isCloud) {
1120+
sessionStoreSetters.updateSession(taskRunId, {
1121+
isPromptPending: false,
1122+
promptStartedAt: null,
1123+
currentPromptId: null,
1124+
});
1125+
}
11201126
}
11211127
// Lifecycle handshake from the agent — flip status to "connected"
11221128
// so the UI can release the queue-while-not-ready guard. This is

0 commit comments

Comments
 (0)