Skip to content

Commit 43a3f79

Browse files
[ai] Fix "start" and "finish" chunk events in DurableAgent (#378)
* [ai] Remove "start" and "finish" chunk events * Wire through sendStart and sendFinish properly * changeset --------- Co-authored-by: Pranay Prakash <[email protected]>
1 parent e5c5236 commit 43a3f79

File tree

4 files changed

+61
-12
lines changed

4 files changed

+61
-12
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@workflow/ai": patch
3+
---
4+
5+
DurableAgent#stream now sends `start` and `finish` chunks properly at the start and end

packages/ai/src/agent/do-stream-step.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@ export async function doStreamStep(
2121
conversationPrompt: LanguageModelV2Prompt,
2222
modelInit: string | (() => Promise<LanguageModelV2>),
2323
writable: WritableStream<UIMessageChunk>,
24-
tools?: LanguageModelV2CallOptions['tools']
24+
tools?: LanguageModelV2CallOptions['tools'],
25+
options?: {
26+
sendStart?: boolean;
27+
}
2528
) {
2629
'use step';
2730

@@ -65,9 +68,11 @@ export async function doStreamStep(
6568
.pipeThrough(
6669
new TransformStream<LanguageModelV2StreamPart, UIMessageChunk>({
6770
start: (controller) => {
68-
controller.enqueue({
69-
type: 'start',
70-
});
71+
if (options?.sendStart) {
72+
controller.enqueue({
73+
type: 'start',
74+
});
75+
}
7176
controller.enqueue({
7277
type: 'start-step',
7378
});
@@ -76,9 +81,6 @@ export async function doStreamStep(
7681
controller.enqueue({
7782
type: 'finish-step',
7883
});
79-
controller.enqueue({
80-
type: 'finish',
81-
});
8284
},
8385
transform: async (part, controller) => {
8486
const partType = part.type;

packages/ai/src/agent/durable-agent.ts

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,18 @@ export interface DurableAgentStreamOptions {
6363
*/
6464
preventClose?: boolean;
6565

66+
/**
67+
* If true, sends a 'start' chunk at the beginning of the stream.
68+
* Defaults to true.
69+
*/
70+
sendStart?: boolean;
71+
72+
/**
73+
* If true, sends a 'finish' chunk at the end of the stream.
74+
* Defaults to true.
75+
*/
76+
sendFinish?: boolean;
77+
6678
/**
6779
* Condition for stopping the generation when there are tool results in the last step.
6880
* When the condition is an array, any of the conditions can be met to stop the generation.
@@ -133,6 +145,7 @@ export class DurableAgent {
133145
writable: options.writable,
134146
prompt: modelPrompt,
135147
stopConditions: options.stopWhen,
148+
sendStart: options.sendStart ?? true,
136149
});
137150

138151
let result = await iterator.next();
@@ -147,16 +160,37 @@ export class DurableAgent {
147160
result = await iterator.next(toolResults);
148161
}
149162

150-
if (!options.preventClose) {
151-
await closeStream(options.writable);
163+
const sendFinish = options.sendFinish ?? true;
164+
const preventClose = options.preventClose ?? false;
165+
166+
// Only call closeStream if there's something to do
167+
if (sendFinish || !preventClose) {
168+
await closeStream(options.writable, preventClose, sendFinish);
152169
}
153170
}
154171
}
155172

156-
async function closeStream(writable: WritableStream<UIMessageChunk>) {
173+
async function closeStream(
174+
writable: WritableStream<UIMessageChunk>,
175+
preventClose?: boolean,
176+
sendFinish?: boolean
177+
) {
157178
'use step';
158179

159-
await writable.close();
180+
// Conditionally write the finish chunk
181+
if (sendFinish) {
182+
const writer = writable.getWriter();
183+
try {
184+
await writer.write({ type: 'finish' });
185+
} finally {
186+
writer.releaseLock();
187+
}
188+
}
189+
190+
// Conditionally close the stream
191+
if (!preventClose) {
192+
await writable.close();
193+
}
160194
}
161195

162196
async function executeTool(

packages/ai/src/agent/stream-text-iterator.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@ export async function* streamTextIterator({
1515
writable,
1616
model,
1717
stopConditions,
18+
sendStart = true,
1819
}: {
1920
prompt: LanguageModelV2Prompt;
2021
tools: ToolSet;
2122
writable: WritableStream<UIMessageChunk>;
2223
model: string | (() => Promise<LanguageModelV2>);
2324
stopConditions?: ModelStopCondition[] | ModelStopCondition;
25+
sendStart?: boolean;
2426
}): AsyncGenerator<
2527
LanguageModelV2ToolCall[],
2628
void,
@@ -30,13 +32,19 @@ export async function* streamTextIterator({
3032

3133
const steps: StepResult<any>[] = [];
3234
let done = false;
35+
let isFirstIteration = true;
36+
3337
while (!done) {
3438
const { toolCalls, finish, step } = await doStreamStep(
3539
conversationPrompt,
3640
model,
3741
writable,
38-
toolsToModelTools(tools)
42+
toolsToModelTools(tools),
43+
{
44+
sendStart: sendStart && isFirstIteration,
45+
}
3946
);
47+
isFirstIteration = false;
4048
steps.push(step);
4149

4250
if (finish?.finishReason === 'tool-calls') {

0 commit comments

Comments
 (0)