Skip to content

Commit c042bd0

Browse files
remove wflow vnext methods (#11499)
## Description <!-- Provide a brief description of the changes in this PR --> Remove `streamVNext`, `resumeStreamVNext`, and `observeStreamVNext` methods, call `stream`, `resumeStream` and `observeStream` directly ```diff + const run = await workflow.createRun({ runId: '123' }); - const stream = await run.streamVNext({ inputData: { ... } }); + const stream = await run.stream({ inputData: { ... } }); ``` ## Related Issue(s) <!-- Link to the issue(s) this PR addresses, using hashtag notation: #123 --> ## Type of Change - [ ] Bug fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [ ] Code refactoring - [ ] Performance improvement - [ ] Test update ## Checklist - [ ] I have made corresponding changes to the documentation (if applicable) - [ ] I have added tests that prove my fix is effective or that my feature works <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes * **Breaking Changes** * Removed deprecated VNext streaming methods (`streamVNext()`, `resumeStreamVNext()`, `observeStreamVNext()`); migrate to standard equivalents (`stream()`, `resumeStream()`, `observeStream()`). * **New Features** * Added `closeOnSuspend` parameter to the streaming API. * **Documentation** * Added migration guide for updating to standard streaming methods. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Abhi Aiyer <abhiaiyer91@gmail.com>
1 parent 81b6a8f commit c042bd0

File tree

16 files changed

+60
-477
lines changed

16 files changed

+60
-477
lines changed

.changeset/all-years-jog.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
---
2+
'@mastra/playground-ui': patch
3+
'@mastra/client-js': patch
4+
'@mastra/inngest': patch
5+
'@mastra/server': patch
6+
'@mastra/core': patch
7+
'mastra': patch
8+
'create-mastra': patch
9+
---
10+
11+
Remove `streamVNext`, `resumeStreamVNext`, and `observeStreamVNext` methods, call `stream`, `resumeStream` and `observeStream` directly
12+
13+
```diff
14+
+ const run = await workflow.createRun({ runId: '123' });
15+
- const stream = await run.streamVNext({ inputData: { ... } });
16+
+ const stream = await run.stream({ inputData: { ... } });
17+
```

client-sdks/client-js/src/resources/agent-builder.test.ts

Lines changed: 0 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -153,15 +153,6 @@ describe('AgentBuilder Streaming Methods (fetch-mocked)', () => {
153153
return Promise.resolve(new Response(body as unknown as ReadableStream, { status: 200 }));
154154
}
155155

156-
// Mock observeStreamVNext endpoint
157-
if (url.includes('/observe-streamVNext?runId=')) {
158-
const body = Workflow.createRecordStream([
159-
{ type: 'cache', payload: { msg: 'cached event 1' } },
160-
{ type: 'live', payload: { msg: 'live event 1' } },
161-
]);
162-
return Promise.resolve(new Response(body as unknown as ReadableStream, { status: 200 }));
163-
}
164-
165156
// Mock observeStreamLegacy endpoint
166157
if (url.includes('/observe-stream-legacy?runId=')) {
167158
const body = Workflow.createRecordStream([
@@ -215,27 +206,6 @@ describe('AgentBuilder Streaming Methods (fetch-mocked)', () => {
215206
expect(call).toBeTruthy();
216207
});
217208

218-
it('observeStreamVNext returns ReadableStream with VNext streaming', async () => {
219-
const stream = await agentBuilder.observeStreamVNext({ runId: 'run-456' });
220-
const reader = (stream as ReadableStream<any>).getReader();
221-
const records: any[] = [];
222-
223-
while (true) {
224-
const { done, value } = await reader.read();
225-
if (done) break;
226-
records.push(value);
227-
}
228-
229-
expect(records).toEqual([
230-
{ type: 'cache', payload: { msg: 'cached event 1' } },
231-
{ type: 'live', payload: { msg: 'live event 1' } },
232-
]);
233-
234-
// Verify correct endpoint was called
235-
const call = fetchMock.mock.calls.find((args: any[]) => String(args[0]).includes('/observe-streamVNext?runId='));
236-
expect(call).toBeTruthy();
237-
});
238-
239209
it('observeStreamLegacy returns ReadableStream with legacy streaming', async () => {
240210
const stream = await agentBuilder.observeStreamLegacy({ runId: 'run-789' });
241211
const reader = (stream as ReadableStream<any>).getReader();

client-sdks/client-js/src/resources/agent-builder.ts

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -314,42 +314,11 @@ export class AgentBuilder extends BaseResource {
314314
return response.body.pipeThrough(this.createRecordParserTransform());
315315
}
316316

317-
/**
318-
* Streams agent builder action progress in real-time using VNext streaming.
319-
* This calls `/api/agent-builder/:actionId/streamVNext`.
320-
*/
321-
async streamVNext(params: AgentBuilderActionRequest, runId?: string) {
322-
const searchParams = new URLSearchParams();
323-
if (runId) {
324-
searchParams.set('runId', runId);
325-
}
326-
327-
const requestContext = parseClientRequestContext(params.requestContext);
328-
const { requestContext: _, ...actionParams } = params;
329-
330-
const url = `/api/agent-builder/${this.actionId}/streamVNext${searchParams.toString() ? `?${searchParams.toString()}` : ''}`;
331-
const response: Response = await this.request(url, {
332-
method: 'POST',
333-
body: { ...actionParams, requestContext },
334-
stream: true,
335-
});
336-
337-
if (!response.ok) {
338-
throw new Error(`Failed to stream agent builder action VNext: ${response.statusText}`);
339-
}
340-
341-
if (!response.body) {
342-
throw new Error('Response body is null');
343-
}
344-
345-
return response.body.pipeThrough(this.createRecordParserTransform());
346-
}
347-
348317
/**
349318
* Observes an existing agent builder action run stream.
350319
* Replays cached execution from the beginning, then continues with live stream.
351320
* This is the recommended method for recovery after page refresh/hot reload.
352-
* This calls `/api/agent-builder/:actionId/observe` (which delegates to observeStreamVNext).
321+
* This calls `/api/agent-builder/:actionId/observe`
353322
*/
354323
async observeStream(params: { runId: string }) {
355324
const searchParams = new URLSearchParams();
@@ -372,32 +341,6 @@ export class AgentBuilder extends BaseResource {
372341
return response.body.pipeThrough(this.createRecordParserTransform());
373342
}
374343

375-
/**
376-
* Observes an existing agent builder action run stream using VNext streaming API.
377-
* Replays cached execution from the beginning, then continues with live stream.
378-
* This calls `/api/agent-builder/:actionId/observe-streamVNext`.
379-
*/
380-
async observeStreamVNext(params: { runId: string }) {
381-
const searchParams = new URLSearchParams();
382-
searchParams.set('runId', params.runId);
383-
384-
const url = `/api/agent-builder/${this.actionId}/observe-streamVNext?${searchParams.toString()}`;
385-
const response: Response = await this.request(url, {
386-
method: 'POST',
387-
stream: true,
388-
});
389-
390-
if (!response.ok) {
391-
throw new Error(`Failed to observe agent builder action stream VNext: ${response.statusText}`);
392-
}
393-
394-
if (!response.body) {
395-
throw new Error('Response body is null');
396-
}
397-
398-
return response.body.pipeThrough(this.createRecordParserTransform());
399-
}
400-
401344
/**
402345
* Observes an existing agent builder action run stream using legacy streaming API.
403346
* Replays cached execution from the beginning, then continues with live stream.

client-sdks/client-js/src/resources/run.ts

Lines changed: 4 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ export class Run extends BaseResource {
246246
tracingOptions?: TracingOptions;
247247
resourceId?: string;
248248
perStep?: boolean;
249+
closeOnSuspend?: boolean;
249250
}) {
250251
const searchParams = new URLSearchParams();
251252

@@ -263,6 +264,7 @@ export class Run extends BaseResource {
263264
tracingOptions: params.tracingOptions,
264265
resourceId: params.resourceId,
265266
perStep: params.perStep,
267+
closeOnSuspend: params.closeOnSuspend,
266268
},
267269
stream: true,
268270
},
@@ -307,82 +309,6 @@ export class Run extends BaseResource {
307309
return response.body.pipeThrough(this.createChunkTransformStream());
308310
}
309311

310-
/**
311-
* Starts a workflow run and returns a stream
312-
* @param params - Object containing the inputData, initialState and requestContext
313-
* @returns Promise containing the workflow execution results
314-
*/
315-
async streamVNext(params: {
316-
inputData?: Record<string, any>;
317-
initialState?: Record<string, any>;
318-
requestContext?: RequestContext;
319-
closeOnSuspend?: boolean;
320-
tracingOptions?: TracingOptions;
321-
resourceId?: string;
322-
perStep?: boolean;
323-
}) {
324-
const searchParams = new URLSearchParams();
325-
326-
searchParams.set('runId', this.runId);
327-
328-
const requestContext = parseClientRequestContext(params.requestContext);
329-
const response: Response = await this.request(
330-
`/api/workflows/${this.workflowId}/streamVNext?${searchParams.toString()}`,
331-
{
332-
method: 'POST',
333-
body: {
334-
inputData: params.inputData,
335-
initialState: params.initialState,
336-
requestContext,
337-
closeOnSuspend: params.closeOnSuspend,
338-
tracingOptions: params.tracingOptions,
339-
resourceId: params.resourceId,
340-
perStep: params.perStep,
341-
},
342-
stream: true,
343-
},
344-
);
345-
346-
if (!response.ok) {
347-
throw new Error(`Failed to stream vNext workflow: ${response.statusText}`);
348-
}
349-
350-
if (!response.body) {
351-
throw new Error('Response body is null');
352-
}
353-
354-
// Pipe the response body through the transform stream
355-
return response.body.pipeThrough(this.createChunkTransformStream());
356-
}
357-
358-
/**
359-
* Observes workflow vNext stream for a workflow run
360-
* @returns Promise containing the workflow execution results
361-
*/
362-
async observeStreamVNext() {
363-
const searchParams = new URLSearchParams();
364-
searchParams.set('runId', this.runId);
365-
366-
const response: Response = await this.request(
367-
`/api/workflows/${this.workflowId}/observe-streamVNext?${searchParams.toString()}`,
368-
{
369-
method: 'POST',
370-
stream: true,
371-
},
372-
);
373-
374-
if (!response.ok) {
375-
throw new Error(`Failed to observe stream vNext workflow: ${response.statusText}`);
376-
}
377-
378-
if (!response.body) {
379-
throw new Error('Response body is null');
380-
}
381-
382-
// Pipe the response body through the transform stream
383-
return response.body.pipeThrough(this.createChunkTransformStream());
384-
}
385-
386312
/**
387313
* Resumes a suspended workflow step asynchronously and returns a promise that resolves when the workflow is complete
388314
* @param params - Object containing the step, resumeData and requestContext
@@ -409,11 +335,11 @@ export class Run extends BaseResource {
409335
}
410336

411337
/**
412-
* Resumes a suspended workflow step that uses streamVNext asynchronously and returns a promise that resolves when the workflow is complete
338+
* Resumes a suspended workflow step that uses stream asynchronously and returns a promise that resolves when the workflow is complete
413339
* @param params - Object containing the step, resumeData and requestContext
414340
* @returns Promise containing the workflow resume results
415341
*/
416-
async resumeStreamVNext(params: {
342+
async resumeStream(params: {
417343
step?: string | string[];
418344
resumeData?: Record<string, any>;
419345
requestContext?: RequestContext | Record<string, any>;

docs/src/content/en/guides/migrations/upgrade-to-v1/client.mdx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,18 @@ Run-related methods cannot be called directly on workflow instance. You need to
172172
+ const stream = await run.stream({ inputData: { ... } });
173173
```
174174

175+
### `streamVNext`, `resumeStreamVNext`, and `observeStreamVNext` methods
176+
177+
The experimental `streamVNext()`, `resumeStreamVNext()`, and `observeStreamVNext()` methods have been removed. These methods are now the standard implementation with updated event structures and return types.
178+
179+
To migrate, use the standard `stream()`, `resumeStream()`, and `observeStream()` methods instead.
180+
181+
```diff
182+
+ const run = await workflow.createRun({ runId: '123' });
183+
- const stream = await run.streamVNext({ inputData: { ... } });
184+
+ const stream = await run.stream({ inputData: { ... } });
185+
```
186+
175187
### Deprecated stream endpoints
176188

177189
Some stream endpoints are deprecated and will be removed. The `/api/agents/:agentId/stream/vnext` endpoint returns 410 Gone, and `/api/agents/:agentId/stream/ui` is deprecated. This change consolidates on standard streaming endpoints.

packages/core/src/loop/network/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -791,7 +791,7 @@ export async function createNetworkLoop({
791791
runId,
792792
});
793793

794-
const stream = run.streamVNext({
794+
const stream = run.stream({
795795
inputData: input,
796796
requestContext: requestContext,
797797
});
@@ -1295,7 +1295,7 @@ export async function networkLoop<OUTPUT extends OutputSchema = undefined>({
12951295
return new MastraAgentNetworkStream({
12961296
run,
12971297
createStream: () => {
1298-
return run.streamVNext({
1298+
return run.stream({
12991299
inputData: {
13001300
task,
13011301
primitiveId: '',

0 commit comments

Comments
 (0)