Skip to content

Commit fb5ee29

Browse files
committed
fix: keep cloud user message visible while log echo is pending
When sending a follow-up to a cloud run, drain queued messages into optimistic user bubbles so the conversation stays visible during the gap between submit and the cloud's session/prompt echo. Mirror the local optimistic flow: clear the bubbles when the echo lands, on send failure, on retry exhaustion, and on terminal-status cleanup. Generated-By: PostHog Code Task-Id: 8aeaf6f9-8b18-426a-9453-c668ca17d227
1 parent 09c791f commit fb5ee29

2 files changed

Lines changed: 122 additions & 9 deletions

File tree

apps/code/src/renderer/features/sessions/service/service.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1379,6 +1379,60 @@ describe("SessionService", () => {
13791379
);
13801380
});
13811381

1382+
it("appends an optimistic user_message before sending a direct cloud prompt", async () => {
1383+
const service = getSessionService();
1384+
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
1385+
createMockSession({
1386+
isCloud: true,
1387+
cloudStatus: "in_progress",
1388+
}),
1389+
);
1390+
mockTrpcCloudTask.sendCommand.mutate.mockResolvedValue({
1391+
success: true,
1392+
result: { stopReason: "end_turn" },
1393+
});
1394+
1395+
await service.sendPrompt("task-123", "Hello cloud");
1396+
1397+
expect(mockSessionStoreSetters.appendOptimisticItem).toHaveBeenCalledWith(
1398+
"run-123",
1399+
expect.objectContaining({
1400+
type: "user_message",
1401+
content: "Hello cloud",
1402+
}),
1403+
);
1404+
const optimisticCallOrder =
1405+
mockSessionStoreSetters.appendOptimisticItem.mock
1406+
.invocationCallOrder[0];
1407+
const sendCallOrder =
1408+
mockTrpcCloudTask.sendCommand.mutate.mock.invocationCallOrder[0];
1409+
expect(optimisticCallOrder).toBeLessThan(sendCallOrder);
1410+
});
1411+
1412+
it("clears optimistic items if a direct cloud send fails", async () => {
1413+
const service = getSessionService();
1414+
mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(
1415+
createMockSession({
1416+
isCloud: true,
1417+
cloudStatus: "in_progress",
1418+
}),
1419+
);
1420+
mockTrpcCloudTask.sendCommand.mutate.mockRejectedValue(
1421+
new Error("network down"),
1422+
);
1423+
1424+
await expect(
1425+
service.sendPrompt("task-123", "Hello cloud"),
1426+
).rejects.toThrow("network down");
1427+
1428+
expect(
1429+
mockSessionStoreSetters.appendOptimisticItem,
1430+
).toHaveBeenCalledTimes(1);
1431+
expect(mockSessionStoreSetters.clearOptimisticItems).toHaveBeenCalledWith(
1432+
"run-123",
1433+
);
1434+
});
1435+
13821436
it("attempts automatic recovery on fatal error", async () => {
13831437
const service = getSessionService();
13841438
const mockSession = createMockSession({

apps/code/src/renderer/features/sessions/service/service.ts

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ const LOCAL_SESSION_RECOVERY_MESSAGE =
9292
const LOCAL_SESSION_RECOVERY_FAILED_MESSAGE =
9393
"Connecting to to the agent has been lost. Retry, or start a new session.";
9494

95+
function isUserPromptEcho(acpMsg: AcpMessage): boolean {
96+
return (
97+
isJsonRpcRequest(acpMsg.message) &&
98+
acpMsg.message.method === "session/prompt"
99+
);
100+
}
101+
95102
/**
96103
* Build default configOptions for cloud sessions so the mode switcher
97104
* is available in the UI even without a local agent connection.
@@ -993,19 +1000,17 @@ export class SessionService {
9931000
const session = sessionStoreSetters.getSessions()[taskRunId];
9941001
if (!session) return;
9951002

996-
const isUserPromptEcho =
997-
isJsonRpcRequest(acpMsg.message) &&
998-
acpMsg.message.method === "session/prompt";
1003+
const isPromptEcho = isUserPromptEcho(acpMsg);
9991004

10001005
// Once the agent starts responding, clear initialPrompt so that
10011006
// retry reconnects to this session instead of creating a new one.
1002-
if (!isUserPromptEcho && session.initialPrompt?.length) {
1007+
if (!isPromptEcho && session.initialPrompt?.length) {
10031008
sessionStoreSetters.updateSession(taskRunId, {
10041009
initialPrompt: undefined,
10051010
});
10061011
}
10071012

1008-
if (isUserPromptEcho) {
1013+
if (isPromptEcho) {
10091014
sessionStoreSetters.replaceOptimisticWithEvent(taskRunId, acpMsg);
10101015
} else {
10111016
sessionStoreSetters.appendEvents(taskRunId, [acpMsg]);
@@ -1518,6 +1523,17 @@ export class SessionService {
15181523
isPromptPending: true,
15191524
});
15201525

1526+
// Show the bubble immediately while we wait for the cloud log stream to
1527+
// echo the user_message back. Without this, the user sees a gap between
1528+
// submit and the agent's response on cloud runs.
1529+
if (!options?.skipQueueGuard) {
1530+
sessionStoreSetters.appendOptimisticItem(session.taskRunId, {
1531+
type: "user_message",
1532+
content: transport.promptText,
1533+
timestamp: Date.now(),
1534+
});
1535+
}
1536+
15211537
track(ANALYTICS_EVENTS.PROMPT_SENT, {
15221538
task_id: session.taskId,
15231539
is_initial: session.events.length === 0,
@@ -1565,6 +1581,12 @@ export class SessionService {
15651581
sessionStoreSetters.updateSession(session.taskRunId, {
15661582
isPromptPending: false,
15671583
});
1584+
// Drop optimistic items so a failed send doesn't leave a ghost bubble.
1585+
// The combined-prompt path (skipQueueGuard) clears its own optimistic
1586+
// items in sendQueuedCloudMessages on retry exhaustion.
1587+
if (!options?.skipQueueGuard) {
1588+
sessionStoreSetters.clearOptimisticItems(session.taskRunId);
1589+
}
15681590
throw error;
15691591
}
15701592
}
@@ -1574,10 +1596,29 @@ export class SessionService {
15741596
attempt = 0,
15751597
pendingPrompt?: string | ContentBlock[],
15761598
): Promise<{ stopReason: string }> {
1577-
// First attempt: atomically dequeue. Retries reuse the already-dequeued prompt.
1578-
const combinedPrompt =
1579-
pendingPrompt ??
1580-
combineQueuedCloudPrompts(sessionStoreSetters.dequeueMessages(taskId));
1599+
// First attempt: atomically dequeue and convert each entry into an
1600+
// optimistic bubble. Retries reuse the already-dequeued prompt and must
1601+
// not stack additional bubbles.
1602+
let combinedPrompt: string | ContentBlock[] | null;
1603+
if (pendingPrompt) {
1604+
combinedPrompt = pendingPrompt;
1605+
} else {
1606+
const dequeued = sessionStoreSetters.dequeueMessages(taskId);
1607+
combinedPrompt = combineQueuedCloudPrompts(dequeued);
1608+
if (combinedPrompt) {
1609+
const taskRunId =
1610+
sessionStoreSetters.getSessionByTaskId(taskId)?.taskRunId;
1611+
if (taskRunId) {
1612+
for (const msg of dequeued) {
1613+
sessionStoreSetters.appendOptimisticItem(taskRunId, {
1614+
type: "user_message",
1615+
content: msg.content,
1616+
timestamp: msg.queuedAt,
1617+
});
1618+
}
1619+
}
1620+
}
1621+
}
15811622
if (!combinedPrompt) return { stopReason: "skipped" };
15821623

15831624
const session = sessionStoreSetters.getSessionByTaskId(taskId);
@@ -1632,6 +1673,10 @@ export class SessionService {
16321673
taskId,
16331674
attempts: attempt + 1,
16341675
});
1676+
const failedSession = sessionStoreSetters.getSessionByTaskId(taskId);
1677+
if (failedSession) {
1678+
sessionStoreSetters.clearOptimisticItems(failedSession.taskRunId);
1679+
}
16351680
toast.error("Failed to send follow-up message. Please try again.");
16361681
return { stopReason: "error" };
16371682
}
@@ -2888,6 +2933,8 @@ export class SessionService {
28882933
const expectedCount = update.totalEntryCount;
28892934
const delta = expectedCount - currentCount;
28902935

2936+
let appendedEvents: AcpMessage[] = [];
2937+
28912938
if (delta <= 0) {
28922939
// Already caught up — skip duplicate entries
28932940
} else if (delta <= update.newEntries.length) {
@@ -2901,6 +2948,7 @@ export class SessionService {
29012948
);
29022949
sessionStoreSetters.appendEvents(taskRunId, newEvents, expectedCount);
29032950
this.updatePromptStateFromEvents(taskRunId, newEvents);
2951+
appendedEvents = newEvents;
29042952
} else {
29052953
// Gap in data — append everything we have but don't jump processedLineCount
29062954
log.warn("Cloud task log count inconsistency", {
@@ -2921,6 +2969,16 @@ export class SessionService {
29212969
currentCount + update.newEntries.length,
29222970
);
29232971
this.updatePromptStateFromEvents(taskRunId, newEvents);
2972+
appendedEvents = newEvents;
2973+
}
2974+
2975+
// The cloud log stream has caught up to a user_message we already
2976+
// showed optimistically — drop the optimistic bubbles so they don't
2977+
// duplicate the now-real event. Mirrors replaceOptimisticWithEvent
2978+
// on the local path; we clear in bulk because cloud entries arrive
2979+
// batched alongside other event kinds.
2980+
if (appendedEvents.some(isUserPromptEcho)) {
2981+
sessionStoreSetters.clearOptimisticItems(taskRunId);
29242982
}
29252983
}
29262984

@@ -2975,6 +3033,7 @@ export class SessionService {
29753033
(session.messageQueue.length > 0 || session.isPromptPending)
29763034
) {
29773035
sessionStoreSetters.clearMessageQueue(session.taskId);
3036+
sessionStoreSetters.clearOptimisticItems(taskRunId);
29783037
sessionStoreSetters.updateSession(taskRunId, {
29793038
isPromptPending: false,
29803039
});

0 commit comments

Comments
 (0)