Skip to content

Commit 4402a6d

Browse files
authored
fix: extends standard readable stream (#103)
1 parent 4201aab commit 4402a6d

File tree

4 files changed

+12
-8
lines changed

4 files changed

+12
-8
lines changed

.changeset/gentle-rings-talk.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@llama-flow/core": patch
3+
---
4+
5+
fix: workflow stream extends standard readable stream

packages/core/src/core/stream.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@ import {
88
import { createSubscribable, type Subscribable } from "./utils";
99

1010
export class WorkflowStream<R = any>
11-
implements AsyncIterable<R>, ReadableStream<R>
11+
extends ReadableStream<R>
12+
implements AsyncIterable<R>
1213
{
1314
#stream: ReadableStream<R>;
14-
1515
#subscribable: Subscribable<[data: R], void>;
1616

1717
on(
@@ -40,6 +40,7 @@ export class WorkflowStream<R = any>
4040
"Either subscribable or root stream must be provided",
4141
);
4242
}
43+
super();
4344
if (!subscribable) {
4445
this.#subscribable = createSubscribable<[data: R], void>();
4546
this.#stream = rootStream!.pipeThrough(

packages/core/tests/integration/stream-library.spec.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ describe("stream-chain", () => {
5656
sendEvent(startEvent.with());
5757
const outputs: WorkflowEventData<any>[] = [];
5858
const pipeline = chain([
59-
// fixme: upstream should be treat it as a stream
60-
stream.until(stopEvent)[Symbol.asyncIterator],
59+
stream.until(stopEvent),
6160
new TransformStream({
6261
transform: (event: WorkflowEventData<any>, controller) => {
6362
if (messageEvent.include(event)) {

packages/llamaindex/src/index.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import {
55
workflowEvent,
66
WorkflowStream,
77
} from "@llama-flow/core";
8-
import { collect } from "@llama-flow/core/stream/consumer";
98
import { createStatefulMiddleware } from "@llama-flow/core/middleware/state";
109

1110
type Handler<
@@ -179,22 +178,22 @@ export class Workflow<ContextData, Start, Stop> {
179178
Object.assign(result, {
180179
then: async (resolve: any, reject: any) => {
181180
try {
182-
const events = await collect(result);
181+
const events = await result.toArray();
183182
resolve(events.at(-1)!);
184183
} catch (error) {
185184
reject(error);
186185
}
187186
},
188187
catch: async (reject: any) => {
189188
try {
190-
await collect(result);
189+
await result.toArray();
191190
} catch (error) {
192191
reject(error);
193192
}
194193
},
195194
finally: async (resolve: any) => {
196195
try {
197-
await collect(result);
196+
await result.toArray();
198197
} finally {
199198
resolve();
200199
}

0 commit comments

Comments
 (0)