Skip to content

Commit 457f2d5

Browse files
committed
Refactor accumulator for turn continuation model
Replace seedMessages/completeSeeded with initMessage/completeMessage on MessageAccumulator to clarify the turn continuation lifecycle. initMessage is idempotent — creates tracking state from an existing message when not active, or syncs with an externally updated message when already active. completeMessage marks a message as completed. Extract shared tool output transition logic into tool-transitions.ts so the accumulator's _processToolOutput delegates to a single transitionToolPart function instead of duplicating the switch.
1 parent 547d291 commit 457f2d5

File tree

11 files changed

+204
-157
lines changed

11 files changed

+204
-157
lines changed

examples/custom-codec/codec.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -514,17 +514,18 @@ class AgentAccumulator implements MessageAccumulator<AgentEvent, AgentMessage> {
514514
}
515515

516516
/**
517-
* Seed the accumulator with an existing message for cross-turn events.
517+
* Initialize the accumulator with an existing message for continuation.
518518
* Not supported by this simple codec.
519-
* @param _messages - The messages to seed (unused).
519+
* @param _messageId - The message identifier (unused).
520+
* @param _message - The existing message (unused).
520521
*/
521522
// eslint-disable-next-line @typescript-eslint/no-unused-vars -- interface contract requires parameters
522-
seedMessages(_messages: { messageId: string; message: AgentMessage }[]): void {
523+
initMessage(_messageId: string, _message: AgentMessage): void {
523524
// No-op — cross-turn events are not supported by this codec.
524525
}
525526

526527
// eslint-disable-next-line @typescript-eslint/no-unused-vars -- interface contract requires parameters
527-
completeSeeded(_messageId: string): void {
528+
completeMessage(_messageId: string): void {
528529
// No-op — cross-turn events are not supported by this codec.
529530
}
530531
}

src/core/codec/types.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -204,18 +204,18 @@ export interface MessageAccumulator<TEvent, TMessage> {
204204
/** Apply an external update to a message (e.g. from an update callback). */
205205
updateMessage(message: TMessage): void;
206206
/**
207-
* Seed the accumulator with existing messages so subsequent events
208-
* targeting these messageIds update the existing state rather than creating
209-
* blank messages. Used for cross-turn events (e.g. tool result delivery)
210-
* and history replay where multiple messages may need seeding.
207+
* Ensure the accumulator is ready to process events for the given message.
208+
* If not already active, creates internal tracking state from the message.
209+
* If already active, syncs internal state with the provided message
210+
* (picking up external changes like cross-turn amendments).
211+
* Idempotent — safe to call before every processOutputs.
211212
*/
212-
seedMessages(messages: { messageId: string; message: TMessage }[]): void;
213+
initMessage(messageId: string, message: TMessage): void;
213214
/**
214-
* Mark a previously seeded message as completed. Called after processing
215-
* cross-turn event outputs — the seed temporarily re-activates the message,
216-
* and this call finalizes it so it appears in {@link completedMessages}.
215+
* Mark a message as completed. Removes it from active tracking so it
216+
* appears in {@link completedMessages}. No-op if not active.
217217
*/
218-
completeSeeded(messageId: string): void;
218+
completeMessage(messageId: string): void;
219219
/** All messages accumulated so far (in-progress and completed). */
220220
readonly messages: TMessage[];
221221
/** Only messages whose streams have finished. */

src/core/transport/client-transport.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -375,7 +375,7 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent
375375
}
376376

377377
const accumulator = this._codec.createAccumulator();
378-
accumulator.seedMessages([{ messageId: targetMsgId, message: existingNode.message }]);
378+
accumulator.initMessage(targetMsgId, existingNode.message);
379379
accumulator.processOutputs([output]);
380380

381381
const updatedMsg = accumulator.messages.at(-1);
@@ -445,14 +445,14 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent
445445
if (!observer) return;
446446

447447
// Sync the accumulator with the tree before processing. If the message
448-
// was updated externally (via cross-turn events), seedMessages updates the
448+
// was updated externally (via cross-turn events), initMessage syncs the
449449
// accumulator's state so the update isn't lost when processing
450450
// late turn events like finish-step/finish.
451451
const msgId = observer.headers[HEADER_MSG_ID];
452452
if (msgId) {
453453
const treeNode = this._tree.getNode(msgId);
454454
if (treeNode) {
455-
observer.accumulator.seedMessages([{ messageId: msgId, message: treeNode.message }]);
455+
observer.accumulator.initMessage(msgId, treeNode.message);
456456
}
457457
}
458458

@@ -608,7 +608,7 @@ class DefaultClientTransport<TEvent, TMessage> implements ClientTransport<TEvent
608608
messageId: node.msgId,
609609
}));
610610
const accumulator = this._codec.createAccumulator();
611-
accumulator.seedMessages([{ messageId: node.msgId, message: existingNode.message }]);
611+
accumulator.initMessage(node.msgId, existingNode.message);
612612
accumulator.processOutputs(outputs);
613613
const updatedMsg = accumulator.messages.at(-1);
614614
if (updatedMsg) {

src/core/transport/decode-history.ts

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -102,28 +102,17 @@ const decodeAll = <TEvent, TMessage>(state: HistoryState<TEvent, TMessage>): Dec
102102
const amendTarget = headers[HEADER_AMEND];
103103

104104
// Cross-turn events target an existing message from a different turn.
105-
// Route them to the turn accumulator that owns the target msgId
106-
// so the tool part state transitions correctly in history.
105+
// Route to the owning turn's accumulator via initMessage lifecycle.
107106
if (amendTarget) {
108107
for (const turn of turns.values()) {
109108
if (turn.msgHeaders.has(amendTarget)) {
110-
// Seed the accumulator with the message's current state so the
111-
// cross-turn events update the correct tool parts. Use the
112-
// accumulator's full messages list (includes in-progress) and
113-
// match by position against the msgHeaders insertion order.
114109
const headerKeys = [...turn.msgHeaders.keys()];
115110
const msgIndex = headerKeys.indexOf(amendTarget);
116111
const currentMsg = msgIndex === -1 ? undefined : turn.accumulator.messages[msgIndex];
117112
if (currentMsg) {
118-
turn.accumulator.seedMessages([{ messageId: amendTarget, message: currentMsg }]);
113+
turn.accumulator.initMessage(amendTarget, currentMsg);
119114
}
120115
turn.accumulator.processOutputs(outputs);
121-
// Defer completion until after all events are processed. A finish
122-
// event that follows this update in serial order must still be
123-
// able to find the message in _activeMessages (e.g. to apply
124-
// messageMetadata). If no finish event arrives, the deferred
125-
// completeSeeded ensures the message still appears in
126-
// completedMessages.
127116
deferredCompletions.push({ accumulator: turn.accumulator, messageId: amendTarget });
128117
break;
129118
}
@@ -175,11 +164,11 @@ const decodeAll = <TEvent, TMessage>(state: HistoryState<TEvent, TMessage>): Dec
175164
}
176165
}
177166

178-
// Complete any seeded-for-update messages that were not already completed
179-
// by a finish/abort event. Idempotent — if finish already removed the message
180-
// from _activeMessages, completeSeeded is a no-op.
167+
// Complete any messages that were re-activated for cross-turn updates.
168+
// Idempotent — if finish already removed the message from active tracking,
169+
// completeMessage is a no-op.
181170
for (const { accumulator, messageId } of deferredCompletions) {
182-
accumulator.completeSeeded(messageId);
171+
accumulator.completeMessage(messageId);
183172
}
184173

185174
// Collect completed messages in chronological order (oldest first) by turn.

src/core/transport/view.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ export class DefaultView<TEvent, TMessage> implements View<TEvent, TMessage> {
597597
// Private: scoped event forwarding
598598
// -------------------------------------------------------------------------
599599

600-
private _updateVisibleSnapshot(nodes?: TreeNode<TMessage>[]): void {
600+
private _updateVisibleSnapshot(nodes?: MessageNode<TMessage>[]): void {
601601
const resolved = nodes ?? this.flattenNodes();
602602
this._lastVisibleIds = resolved.map((n) => n.msgId);
603603
this._lastVisibleMessages = resolved.map((n) => n.message);

src/vercel/codec/accumulator.ts

Lines changed: 11 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type * as AI from 'ai';
1717

1818
import type { DecoderOutput, MessageAccumulator } from '../../core/codec/types.js';
1919
import { stripUndefined } from '../../utils.js';
20+
import { toolBase, transitionToolPart } from './tool-transitions.js';
2021

2122
// ---------------------------------------------------------------------------
2223
// Internal types
@@ -38,15 +39,6 @@ interface ToolPartTracker {
3839
inputText: string;
3940
}
4041

41-
/** Fields shared by all DynamicToolUIPart state variants. */
42-
interface ToolBaseFields {
43-
type: 'dynamic-tool';
44-
toolName: string;
45-
toolCallId: string;
46-
title?: string;
47-
providerExecuted?: boolean;
48-
}
49-
5042
/** Bundled per-message state for an in-progress message. */
5143
interface ActiveMessageState {
5244
message: AI.UIMessage;
@@ -56,34 +48,6 @@ interface ActiveMessageState {
5648
streamStatus: Map<string, StreamStatus>;
5749
}
5850

59-
// ---------------------------------------------------------------------------
60-
// Tool base helper
61-
// ---------------------------------------------------------------------------
62-
63-
/**
64-
* Extract the state-independent base fields for a DynamicToolUIPart.
65-
* Works with both chunks (tool-input-start, etc.) and existing parts.
66-
* @param source - Any object containing the required tool identity fields.
67-
* @param source.toolCallId - The tool call identifier.
68-
* @param source.toolName - The tool name.
69-
* @param source.title - Optional display title.
70-
* @param source.providerExecuted - Whether the provider executed the tool.
71-
* @returns Base fields shared across all DynamicToolUIPart state variants.
72-
*/
73-
const toolBase = (source: {
74-
toolCallId: string;
75-
toolName: string;
76-
title?: string;
77-
providerExecuted?: boolean;
78-
}): ToolBaseFields =>
79-
stripUndefined({
80-
type: 'dynamic-tool' as const,
81-
toolCallId: source.toolCallId,
82-
toolName: source.toolName,
83-
title: source.title,
84-
providerExecuted: source.providerExecuted,
85-
});
86-
8751
// ---------------------------------------------------------------------------
8852
// DeltaStreamTracker — manages text or reasoning stream accumulation
8953
// ---------------------------------------------------------------------------
@@ -172,23 +136,13 @@ class DefaultUIMessageAccumulator implements MessageAccumulator<AI.UIMessageChun
172136
}
173137
}
174138

175-
seedMessages(messages: { messageId: string; message: AI.UIMessage }[]): void {
176-
for (const { messageId, message } of messages) {
177-
this._seedOne(messageId, message);
178-
}
179-
}
180-
181-
completeSeeded(messageId: string): void {
182-
this._activeMessages.delete(messageId);
183-
}
184-
185-
private _seedOne(messageId: string, message: AI.UIMessage): void {
139+
initMessage(messageId: string, message: AI.UIMessage): void {
186140
const existing = this._activeMessages.get(messageId);
187141

188142
if (existing) {
189-
// Update: sync the active state with an externally updated message.
143+
// Already active — sync with the externally updated message.
190144
// Replace the message and rebuild tool trackers so the accumulator
191-
// reflects updates (e.g. tool results published via cross-turn events)
145+
// reflects updates (e.g. cross-turn amendments applied to the tree)
192146
// that happened outside the streaming flow.
193147
const cloned = structuredClone(message);
194148
const listIdx = this._messageList.indexOf(existing.message);
@@ -207,16 +161,15 @@ class DefaultUIMessageAccumulator implements MessageAccumulator<AI.UIMessageChun
207161
return;
208162
}
209163

164+
// Not active — create tracking state from the existing message.
210165
const cloned = structuredClone(message);
211166
const toolTrackers: Record<string, ToolPartTracker> = {};
212167
const streamStatus = new Map<string, StreamStatus>();
213168

214-
// Reconstruct tool trackers from existing dynamic-tool parts
215169
for (let i = 0; i < cloned.parts.length; i++) {
216170
const part = cloned.parts[i];
217171
if (part?.type === 'dynamic-tool') {
218172
toolTrackers[part.toolCallId] = { partIndex: i, inputText: '' };
219-
// All existing tool parts have completed their input phase
220173
streamStatus.set(part.toolCallId, 'finished');
221174
}
222175
}
@@ -232,8 +185,7 @@ class DefaultUIMessageAccumulator implements MessageAccumulator<AI.UIMessageChun
232185
this._activeMessages.set(messageId, state);
233186

234187
// If this message is already in the list (completed previously),
235-
// replace it in-place so updates apply to the same reference.
236-
// Otherwise push as a new entry.
188+
// replace in-place. Otherwise push as a new entry.
237189
const existingIdx = this._messageList.findIndex((m) => m.id === message.id);
238190
if (existingIdx === -1) {
239191
this._messageList.push(state.message);
@@ -242,6 +194,10 @@ class DefaultUIMessageAccumulator implements MessageAccumulator<AI.UIMessageChun
242194
}
243195
}
244196

197+
completeMessage(messageId: string): void {
198+
this._activeMessages.delete(messageId);
199+
}
200+
245201
// -------------------------------------------------------------------------
246202
// Shared helpers
247203
// -------------------------------------------------------------------------
@@ -573,48 +529,7 @@ class DefaultUIMessageAccumulator implements MessageAccumulator<AI.UIMessageChun
573529
const found = this._getToolPart(chunk.toolCallId, state);
574530
if (!found) return;
575531

576-
switch (chunk.type) {
577-
case 'tool-output-available': {
578-
state.message.parts[found.tracker.partIndex] = stripUndefined({
579-
...toolBase(found.part),
580-
state: 'output-available' as const,
581-
input: found.part.input,
582-
output: chunk.output,
583-
preliminary: chunk.preliminary,
584-
});
585-
break;
586-
}
587-
588-
case 'tool-output-error': {
589-
state.message.parts[found.tracker.partIndex] = {
590-
...toolBase(found.part),
591-
state: 'output-error',
592-
input: found.part.input,
593-
errorText: chunk.errorText,
594-
};
595-
break;
596-
}
597-
598-
case 'tool-output-denied': {
599-
state.message.parts[found.tracker.partIndex] = {
600-
...toolBase(found.part),
601-
state: 'output-denied',
602-
input: found.part.input,
603-
approval: { id: '', approved: false },
604-
};
605-
break;
606-
}
607-
608-
case 'tool-approval-request': {
609-
state.message.parts[found.tracker.partIndex] = {
610-
...toolBase(found.part),
611-
state: 'approval-requested',
612-
input: found.part.input,
613-
approval: { id: chunk.approvalId },
614-
};
615-
break;
616-
}
617-
}
532+
state.message.parts[found.tracker.partIndex] = transitionToolPart(found.part, chunk);
618533
}
619534

620535
// -------------------------------------------------------------------------

0 commit comments

Comments
 (0)