Skip to content

Commit f474495

Browse files
jerryliang64claude
andauthored
fix(agent-runtime): persist partial transcript when a run fails mid-turn (#449)
## Problem When the executor's `execRun()` async generator throws part-way through a turn for a **non-abort** reason — most commonly the upstream model stream being terminated (`API Error: terminated`) — the run's user prompt and any already-generated assistant output were **never written to the thread store**. Root cause: `messages.jsonl` is only written by `store.appendMessages()`, which is only reached on the **success** path of each execution method. In the `catch` blocks, the *abort* branch already had a fallback persist (`persistMessagesOnAbort`, guarded by `task.committed`), but the **non-abort error** branch only called `updateRun(rb.fail)` and pushed an `error` event — it never persisted anything. Result: the failed turn vanishes from `messages.jsonl` entirely. The client sees the error, but the persisted history skips straight from the previous turn to the next one, leaving storage inconsistent with what actually happened and unusable for resume / audit. ## Fix All three execution paths — `syncRun`, `asyncRun`, and `executeStreamBackground` — now persist the partial transcript in their non-abort error branch, **before** marking the run failed. This mirrors the existing abort fallback and reuses the same `task.committed` guard, so we never write a thread for a session the executor never persisted to disk (which would break subsequent resume). `persistMessagesOnAbort` is renamed to `persistPartialMessages` since it now serves both the abort and the mid-turn-failure paths; its doc comment is generalized accordingly. Behavior of the method body is unchanged (append input messages + `filterForStorage(streamMessages)`, swallow store errors). Because token-level deltas are `stream_event` messages (filtered out by `filterForStorage`), a mid-stream `tool_use` that never completed is *not* present in `streamMessages`, so this cannot persist an unterminated/half tool_use block. ## Tests Added one regression test per execution path (`syncRun` / `asyncRun` / `streamRun`): yield one assistant message (flipping `committed=true`), then throw, and assert the run is `Failed` **and** the thread retains both the user and the assistant message. All existing tests pass (158 passing). Note: 2 pre-existing failures in `cancelRun` (`AgentTimeoutError is not a constructor`) are present on `master` as well and are unrelated to this change (local ts-node type-resolution of `@eggjs/tegg-types`). 🤖 Generated with [Claude Code](https://claude.com/claude-code) <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Conversation history is more robust during non-success runs: committed assistant responses are preserved and persisted on failures, aborts, or mid-turn errors, and duplicate messages are avoided. * **Tests** * Added runtime tests verifying partial transcripts are persisted and not duplicated when execution fails after committing output in sync, async, and streaming runs. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
1 parent 5d90357 commit f474495

2 files changed

Lines changed: 159 additions & 32 deletions

File tree

core/agent-runtime/src/AgentRuntime.ts

Lines changed: 61 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -186,14 +186,22 @@ export class AgentRuntime {
186186
this.runningTasks.set(run.id, task);
187187

188188
const streamMessages: AgentMessage[] = [];
189+
// Persist a turn's transcript to the thread at most once, even if a later
190+
// store call (e.g. updateRun) throws after a successful append and routes
191+
// us through a catch-block persist. Guarded by task.committed so we never
192+
// write a thread the executor has not persisted to its own session.
193+
let messagesPersisted = false;
194+
const persistPartialOnce = async (): Promise<void> => {
195+
if (!task.committed || messagesPersisted) return;
196+
messagesPersisted = true;
197+
await this.persistPartialMessages(threadId, input, streamMessages);
198+
};
189199
try {
190200
await this.store.updateRun(run.id, rb.start());
191201

192202
for await (const msg of this.executor.execRun(input, abortController.signal)) {
193203
if (abortController.signal.aborted) {
194-
if (task.committed) {
195-
await this.persistMessagesOnAbort(threadId, input, streamMessages);
196-
}
204+
await persistPartialOnce();
197205
await this.finaliseAbortedRun(run.id);
198206
const latest = await this.store.getRun(run.id);
199207
return RunBuilder.fromRecord(latest).snapshot();
@@ -208,9 +216,7 @@ export class AgentRuntime {
208216
// terminal state instead of overwriting it with Completed.
209217
const currentRun = await this.store.getRun(run.id);
210218
if (AgentRuntime.POST_LOOP_TERMINAL_STATUSES.has(currentRun.status)) {
211-
if (task.committed) {
212-
await this.persistMessagesOnAbort(threadId, input, streamMessages);
213-
}
219+
await persistPartialOnce();
214220
await this.finaliseAbortedRun(run.id);
215221
const latest = await this.store.getRun(run.id);
216222
return RunBuilder.fromRecord(latest).snapshot();
@@ -223,19 +229,23 @@ export class AgentRuntime {
223229
...MessageConverter.toAgentMessages(input.input.messages),
224230
...MessageConverter.filterForStorage(streamMessages),
225231
]);
232+
messagesPersisted = true;
226233

227234
await this.store.updateRun(run.id, rb.complete(usage));
228235

229236
return rb.snapshot();
230237
} catch (err: unknown) {
231238
if (abortController.signal.aborted) {
232-
if (task.committed) {
233-
await this.persistMessagesOnAbort(threadId, input, streamMessages);
234-
}
239+
await persistPartialOnce();
235240
await this.finaliseAbortedRun(run.id);
236241
const latest = await this.store.getRun(run.id);
237242
return RunBuilder.fromRecord(latest).snapshot();
238243
}
244+
// Non-abort failure (e.g. upstream stream terminated mid-turn). Persist
245+
// the partial transcript so the thread history keeps the user turn and
246+
// any committed assistant output instead of silently dropping the run.
247+
// No-op if the success path already appended (avoids duplicate history).
248+
await persistPartialOnce();
239249
try {
240250
await this.store.updateRun(run.id, rb.fail(err as Error));
241251
} catch (storeErr) {
@@ -276,14 +286,19 @@ export class AgentRuntime {
276286

277287
(async () => {
278288
const streamMessages: AgentMessage[] = [];
289+
// Persist a turn's transcript to the thread at most once (see syncRun).
290+
let messagesPersisted = false;
291+
const persistPartialOnce = async (): Promise<void> => {
292+
if (!task.committed || messagesPersisted) return;
293+
messagesPersisted = true;
294+
await this.persistPartialMessages(threadId, input, streamMessages);
295+
};
279296
try {
280297
await this.store.updateRun(run.id, rb.start());
281298

282299
for await (const msg of this.executor.execRun(input, abortController.signal)) {
283300
if (abortController.signal.aborted) {
284-
if (task.committed) {
285-
await this.persistMessagesOnAbort(threadId, input, streamMessages);
286-
}
301+
await persistPartialOnce();
287302
await this.finaliseAbortedRun(run.id);
288303
return;
289304
}
@@ -296,6 +311,7 @@ export class AgentRuntime {
296311
// an external expiration) instead of overwriting it with Completed.
297312
const currentRun = await this.store.getRun(run.id);
298313
if (AgentRuntime.POST_LOOP_TERMINAL_STATUSES.has(currentRun.status)) {
314+
await persistPartialOnce();
299315
return;
300316
}
301317

@@ -306,10 +322,16 @@ export class AgentRuntime {
306322
...MessageConverter.toAgentMessages(input.input.messages),
307323
...MessageConverter.filterForStorage(streamMessages),
308324
]);
325+
messagesPersisted = true;
309326

310327
await this.store.updateRun(run.id, rb.complete(usage));
311328
} catch (err: unknown) {
312329
if (!abortController.signal.aborted) {
330+
// Non-abort failure (e.g. upstream stream terminated mid-turn).
331+
// Persist the partial transcript before marking the run failed so
332+
// the thread history is not silently dropped. No-op if the success
333+
// path already appended (avoids duplicate history).
334+
await persistPartialOnce();
313335
try {
314336
const currentRun = await this.store.getRun(run.id);
315337
if (currentRun.status !== RunStatus.Cancelling && currentRun.status !== RunStatus.Cancelled) {
@@ -319,9 +341,7 @@ export class AgentRuntime {
319341
this.logger.error('[AgentRuntime] failed to update run status after error:', storeErr);
320342
}
321343
} else {
322-
if (task.committed) {
323-
await this.persistMessagesOnAbort(threadId, input, streamMessages);
324-
}
344+
await persistPartialOnce();
325345
await this.finaliseAbortedRun(run.id);
326346
this.logger.error('[AgentRuntime] execRun error during abort:', err);
327347
}
@@ -437,14 +457,19 @@ export class AgentRuntime {
437457
): Promise<void> {
438458
const abortController = task.abortController;
439459
const streamMessages: AgentMessage[] = [];
460+
// Persist a turn's transcript to the thread at most once (see syncRun).
461+
let messagesPersisted = false;
462+
const persistPartialOnce = async (): Promise<void> => {
463+
if (!task.committed || messagesPersisted) return;
464+
messagesPersisted = true;
465+
await this.persistPartialMessages(threadId, input, streamMessages);
466+
};
440467
try {
441468
await this.store.updateRun(runId, rb.start());
442469

443470
for await (const msg of this.executor.execRun(input, abortController.signal)) {
444471
if (abortController.signal.aborted) {
445-
if (task.committed) {
446-
await this.persistMessagesOnAbort(threadId, input, streamMessages);
447-
}
472+
await persistPartialOnce();
448473
await this.finaliseAbortedRun(runId);
449474
this.pushEvent(buffer, 'error', { message: 'cancelled', runId });
450475
return;
@@ -464,9 +489,7 @@ export class AgentRuntime {
464489
// an external expiration) instead of overwriting it with Completed.
465490
const currentRun = await this.store.getRun(runId);
466491
if (AgentRuntime.POST_LOOP_TERMINAL_STATUSES.has(currentRun.status)) {
467-
if (task.committed) {
468-
await this.persistMessagesOnAbort(threadId, input, streamMessages);
469-
}
492+
await persistPartialOnce();
470493
this.pushEvent(buffer, 'error', { message: currentRun.status, runId });
471494
return;
472495
}
@@ -477,11 +500,17 @@ export class AgentRuntime {
477500
...MessageConverter.toAgentMessages(input.input.messages),
478501
...MessageConverter.filterForStorage(streamMessages),
479502
]);
503+
messagesPersisted = true;
480504
await this.store.updateRun(runId, rb.complete(usage));
481505

482506
this.pushEvent(buffer, 'done', { result: 'success', runId });
483507
} catch (err: unknown) {
484508
if (!abortController.signal.aborted) {
509+
// Non-abort failure (e.g. upstream stream terminated mid-turn).
510+
// Persist the partial transcript before marking the run failed so the
511+
// thread history is not silently dropped. No-op if the success path
512+
// already appended (avoids duplicate history).
513+
await persistPartialOnce();
485514
try {
486515
const currentRun = await this.store.getRun(runId);
487516
if (currentRun.status !== RunStatus.Cancelling && currentRun.status !== RunStatus.Cancelled) {
@@ -492,9 +521,7 @@ export class AgentRuntime {
492521
}
493522
this.pushEvent(buffer, 'error', { message: (err as Error).message, runId });
494523
} else {
495-
if (task.committed) {
496-
await this.persistMessagesOnAbort(threadId, input, streamMessages);
497-
}
524+
await persistPartialOnce();
498525
await this.finaliseAbortedRun(runId);
499526
this.logger.error('[AgentRuntime] execRun error during abort:', err);
500527
this.pushEvent(buffer, 'error', { message: 'cancelled', runId });
@@ -578,21 +605,23 @@ export class AgentRuntime {
578605
}
579606

580607
/**
581-
* Persist input + collected stream messages to the thread when a run is
582-
* aborted. Keeping the thread in sync with any partial state that the
583-
* executor has already written (e.g. Claude CLI session file) is what
584-
* allows subsequent resume requests to continue from a consistent history
585-
* instead of diverging and failing at executor startup.
608+
* Persist input + collected stream messages to the thread when a run ends
609+
* on a non-success path — either an abort/cancel, or a mid-turn failure
610+
* (e.g. the upstream stream is terminated). Keeping the thread in sync with
611+
* any partial state that the executor has already written (e.g. Claude CLI
612+
* session file) is what allows subsequent resume requests to continue from a
613+
* consistent history instead of diverging and failing at executor startup,
614+
* and prevents a failed turn from vanishing entirely from thread history.
586615
*
587616
* Callers must check `task.committed` before invoking this; if the
588617
* executor never reached a committed state the thread should be left
589618
* untouched so the next run starts fresh instead of trying to resume a
590619
* session that was never created on disk.
591620
*
592-
* Errors are swallowed here so a store failure cannot mask the abort or
593-
* prevent cancelRun from finalising the run status.
621+
* Errors are swallowed here so a store failure cannot mask the original
622+
* abort/failure or prevent the run status from being finalised.
594623
*/
595-
private async persistMessagesOnAbort(
624+
private async persistPartialMessages(
596625
threadId: string,
597626
input: CreateRunInput,
598627
streamMessages: AgentMessage[],

core/agent-runtime/test/AgentRuntime.test.ts

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,63 @@ describe('test/AgentRuntime.test.ts', () => {
317317
},
318318
);
319319
});
320+
321+
it('should persist the partial transcript and mark Failed when execRun throws after committing', async () => {
322+
const thread = await runtime.createThread();
323+
executor.execRun = async function* (): AsyncGenerator<AgentMessage> {
324+
yield { type: 'assistant', message: { role: 'assistant', content: [{ type: 'text', text: 'partial' }] } };
325+
throw new Error('stream terminated');
326+
};
327+
328+
await assert.rejects(
329+
() => runtime.syncRun({ threadId: thread.id, input: { messages: [{ role: 'user', content: 'Hi' }] } }),
330+
(err: unknown) => {
331+
assert(err instanceof Error);
332+
assert.equal(err.message, 'stream terminated');
333+
return true;
334+
},
335+
);
336+
337+
// The failed turn must not vanish: user + committed assistant are persisted.
338+
const persisted = await store.getThread(thread.id);
339+
assert.equal(persisted.messages.length, 2);
340+
assert.equal(persisted.messages[0].type, 'user');
341+
assert.equal(persisted.messages[1].type, 'assistant');
342+
});
343+
344+
it('should not duplicate thread messages when updateRun fails after a successful append', async () => {
345+
const thread = await runtime.createThread();
346+
// Executor finishes normally (commits), so the success path appends, then
347+
// the completing updateRun throws — routing into the catch block.
348+
executor.execRun = async function* (): AsyncGenerator<AgentMessage> {
349+
yield { type: 'assistant', message: { role: 'assistant', content: [{ type: 'text', text: 'done' }] } };
350+
yield {
351+
type: 'result',
352+
subtype: 'success',
353+
usage: { input_tokens: 1, output_tokens: 1 },
354+
} as SDKResultMessage;
355+
};
356+
// Fail the 2nd updateRun (1=rb.start, 2=rb.complete) — i.e. right after
357+
// the success-path appendMessages has already written the transcript.
358+
let calls = 0;
359+
const origUpdateRun = store.updateRun.bind(store);
360+
store.updateRun = async (runId: string, updates: Partial<RunRecord>) => {
361+
calls++;
362+
if (calls === 2) throw new Error('updateRun failed');
363+
return origUpdateRun(runId, updates);
364+
};
365+
366+
await assert.rejects(
367+
() => runtime.syncRun({ threadId: thread.id, input: { messages: [{ role: 'user', content: 'Hi' }] } }),
368+
);
369+
370+
// Messages must be appended exactly once — the catch-block persist must be
371+
// a no-op because the success path already persisted (no duplicate history).
372+
const persisted = await store.getThread(thread.id);
373+
assert.equal(persisted.messages.length, 2);
374+
assert.equal(persisted.messages[0].type, 'user');
375+
assert.equal(persisted.messages[1].type, 'assistant');
376+
});
320377
});
321378

322379
describe('asyncRun', () => {
@@ -369,6 +426,24 @@ describe('test/AgentRuntime.test.ts', () => {
369426
const run = await store.getRun(result.id);
370427
assert.deepStrictEqual(run.metadata, meta);
371428
});
429+
430+
it('should persist the partial transcript and mark Failed when execRun throws after committing', async () => {
431+
executor.execRun = async function* (): AsyncGenerator<AgentMessage> {
432+
yield { type: 'assistant', message: { role: 'assistant', content: [{ type: 'text', text: 'partial' }] } };
433+
throw new Error('stream terminated');
434+
};
435+
436+
const result = await runtime.asyncRun({ input: { messages: [{ role: 'user', content: 'Hi' }] } });
437+
await runtime.waitForPendingTasks();
438+
439+
const run = await store.getRun(result.id);
440+
assert.equal(run.status, RunStatus.Failed);
441+
442+
const thread = await store.getThread(result.threadId!);
443+
assert.equal(thread.messages.length, 2);
444+
assert.equal(thread.messages[0].type, 'user');
445+
assert.equal(thread.messages[1].type, 'assistant');
446+
});
372447
});
373448

374449
describe('streamRun', () => {
@@ -526,6 +601,29 @@ describe('test/AgentRuntime.test.ts', () => {
526601
assert.equal(run.status, RunStatus.Failed);
527602
});
528603

604+
it('should persist the partial transcript when execRun throws after committing', async () => {
605+
executor.execRun = async function* (): AsyncGenerator<AgentMessage> {
606+
yield { type: 'assistant', message: { role: 'assistant', content: [{ type: 'text', text: 'partial' }] } };
607+
throw new Error('stream terminated');
608+
};
609+
610+
const writer = new MockSSEWriter();
611+
await runtime.streamRun({ input: { messages: [{ role: 'user', content: 'Hi' }] } }, writer);
612+
613+
const runCreatedEvent = writer.events[0].data as StreamEvent;
614+
const runId = (runCreatedEvent.data as any).runId;
615+
const threadId = (runCreatedEvent.data as any).threadId;
616+
617+
const run = await runtime.getRun(runId);
618+
assert.equal(run.status, RunStatus.Failed);
619+
620+
// The failed turn must not vanish from thread history.
621+
const thread = await runtime.getThread(threadId);
622+
assert.equal(thread.messages.length, 2);
623+
assert.equal(thread.messages[0].type, 'user');
624+
assert.equal(thread.messages[1].type, 'assistant');
625+
});
626+
529627
it('should persist usage to store on completion', async () => {
530628
executor.execRun = async function* (): AsyncGenerator<AgentMessage> {
531629
yield { type: 'assistant', message: { content: 'Hi' } };

0 commit comments

Comments
 (0)