Skip to content

Commit 9c65785

Browse files
feat: add withSnapshot API (#97)
Co-authored-by: Marcus Schiesser <[email protected]>
1 parent 23ecfc7 commit 9c65785

File tree

14 files changed

+1217
-56
lines changed

14 files changed

+1217
-56
lines changed

.changeset/pretty-moose-share.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
"demo": patch
3+
"@llama-flow/core": patch
4+
---
5+
6+
feat: add `withSnapshot` middleware API
7+
8+
Add snapshot API, for human in the loop feature. The API is designed for cross JavaScript platform, including node.js, browser, and serverless platform such as cloudflare worker and edge runtime
9+
10+
- `workflow.createContext(): Context`
11+
- `context.snapshot(): Promise<[requestEvent, snapshot]>`
12+
- `workflow.resume(data, snapshot)`

demo/hono/app.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,54 @@ app.post(
1818
),
1919
);
2020

21+
const serializableMemoryMap = new Map<string, any>();
22+
23+
app.post("/human-in-the-loop", async (ctx) => {
24+
const { workflow, stopEvent, startEvent, humanInteractionEvent } =
25+
await import("../workflows/human-in-the-loop");
26+
const json = await ctx.req.json();
27+
let context: ReturnType<typeof workflow.createContext>;
28+
if (json.requestId) {
29+
const data = json.data;
30+
const serializable = serializableMemoryMap.get(json.requestId);
31+
context = workflow.resume(data, serializable);
32+
} else {
33+
context = workflow.createContext();
34+
context.sendEvent(startEvent.with(json.data));
35+
}
36+
37+
const { onRequest, stream } = context;
38+
return new Promise<Response>((resolve) => {
39+
// listen to human interaction
40+
onRequest(humanInteractionEvent, async (reason) => {
41+
context.snapshot().then(([re, sd]) => {
42+
const requestId = crypto.randomUUID();
43+
serializableMemoryMap.set(requestId, sd);
44+
resolve(
45+
Response.json({
46+
requestId: requestId,
47+
reason: reason,
48+
data: re.map((r) =>
49+
r === humanInteractionEvent
50+
? "request human in the loop"
51+
: "UNKNOWN",
52+
),
53+
}),
54+
);
55+
});
56+
});
57+
58+
// consume stream
59+
stream
60+
.until(stopEvent)
61+
.toArray()
62+
.then((events) => {
63+
const stopEvent = events.at(-1)!;
64+
resolve(Response.json(stopEvent.data));
65+
});
66+
});
67+
});
68+
2169
serve(app, ({ port }) => {
2270
console.log(`Server started at http://localhost:${port}`);
2371
});

demo/node/name-ask-readline.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { input } from "@inquirer/prompts";
2+
import {
3+
workflow,
4+
stopEvent,
5+
startEvent,
6+
humanInteractionEvent,
7+
} from "../workflows/human-in-the-loop";
8+
9+
const name = await input({
10+
message: "What is your name?",
11+
});
12+
const { onRequest, stream, sendEvent } = workflow.createContext();
13+
14+
sendEvent(startEvent.with(name));
15+
16+
onRequest(humanInteractionEvent, async (reason) => {
17+
console.log("Requesting human interaction...");
18+
const name = await input({
19+
message: JSON.parse(reason).message,
20+
});
21+
console.log("Human interaction completed.");
22+
sendEvent(humanInteractionEvent.with(name));
23+
});
24+
25+
stream.on(stopEvent, ({ data }) => {
26+
console.log("AI analysis: ", data);
27+
});
28+
29+
await stream.until(stopEvent).toArray();

demo/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"private": true,
55
"dependencies": {
66
"@hono/node-server": "^1.14.1",
7+
"@inquirer/prompts": "^7.5.0",
78
"@llama-flow/core": "latest",
89
"@modelcontextprotocol/sdk": "^1.10.1",
910
"hono": "^4.7.7",

demo/workflows/human-in-the-loop.ts

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import { withSnapshot, request } from "@llama-flow/core/middleware/snapshot";
2+
import { createWorkflow, workflowEvent, getContext } from "@llama-flow/core";
3+
import { OpenAI } from "openai";
4+
5+
const openai = new OpenAI();
6+
7+
const workflow = withSnapshot(createWorkflow());
8+
9+
const startEvent = workflowEvent<string>({
10+
debugLabel: "start",
11+
});
12+
const humanInteractionEvent = workflowEvent<string>({
13+
debugLabel: "humanInteraction",
14+
});
15+
const stopEvent = workflowEvent<string>({
16+
debugLabel: "stop",
17+
});
18+
19+
workflow.handle([startEvent], async ({ data }) => {
20+
const response = await openai.chat.completions.create({
21+
stream: false,
22+
model: "gpt-4.1",
23+
messages: [
24+
{
25+
role: "system",
26+
content: `You are a helpful assistant.
27+
If user doesn't provide his/her name, call ask_name tool to ask for user's name.
28+
Otherwise, analyze user's name with a good meaning and return the analysis.
29+
30+
For example, alex is from "Alexander the Great", who was a king of the ancient Greek kingdom of Macedon and one of history's greatest military minds.`,
31+
},
32+
{
33+
role: "user",
34+
content: data,
35+
},
36+
],
37+
tools: [
38+
{
39+
type: "function",
40+
function: {
41+
name: "ask_name",
42+
description: "Ask for user's name",
43+
parameters: {
44+
type: "object",
45+
properties: {
46+
message: {
47+
type: "string",
48+
description: "The message to ask for user's name",
49+
},
50+
},
51+
required: ["message"],
52+
},
53+
},
54+
},
55+
],
56+
});
57+
const tools = response.choices[0].message.tool_calls;
58+
if (tools && tools.length > 0) {
59+
const askName = tools.find((tool) => tool.function.name === "ask_name");
60+
if (askName) {
61+
return request(humanInteractionEvent, askName.function.arguments);
62+
}
63+
}
64+
return stopEvent.with(response.choices[0].message.content!);
65+
});
66+
67+
workflow.handle([humanInteractionEvent], async ({ data }) => {
68+
const { sendEvent } = getContext();
69+
// going back to the start event
70+
sendEvent(startEvent.with(data));
71+
});
72+
73+
export { workflow, startEvent, humanInteractionEvent, stopEvent };

packages/core/package.json

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@
6868
"types": "./middleware/validation.d.ts",
6969
"default": "./middleware/validation.js"
7070
},
71+
"./middleware/snapshot": {
72+
"types": "./middleware/snapshot.d.ts",
73+
"default": "./middleware/snapshot.js"
74+
},
7175
"./util/p-retry": {
7276
"types": "./util/p-retry.d.ts",
7377
"default": "./util/p-retry.js"

packages/core/src/core/context.ts

Lines changed: 46 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -156,11 +156,11 @@ export const createContext = ({
156156
// return value is a special event
157157
if (isPromiseLike(result)) {
158158
(handlerContext as any).async = true;
159-
(handlerContext as any).pending = result;
160-
result.then((event) => {
159+
(handlerContext as any).pending = result.then((event) => {
161160
if (isEventData(event)) {
162161
workflowContext.sendEvent(event);
163162
}
163+
return event;
164164
});
165165
} else if (isEventData(result)) {
166166
workflowContext.sendEvent(result);
@@ -196,44 +196,50 @@ export const createContext = ({
196196
};
197197
const createWorkflowContext = (
198198
handlerContext: HandlerContext,
199-
): WorkflowContext => ({
200-
get stream() {
201-
const subscribable = createSubscribable<
202-
[event: WorkflowEventData<any>],
203-
void
204-
>();
205-
rootWorkflowContext.__internal__call_send_event.subscribe(
206-
(newEvent: WorkflowEventData<any>) => {
207-
let currentEventContext = eventContextWeakMap.get(newEvent);
208-
while (currentEventContext) {
209-
if (currentEventContext === handlerContext) {
210-
subscribable.publish(newEvent);
211-
break;
212-
}
213-
currentEventContext = currentEventContext.prev;
214-
}
215-
},
216-
);
217-
return new WorkflowStream<WorkflowEventData<any>>(subscribable, null);
218-
},
219-
get signal() {
220-
return handlerContext.abortController.signal;
221-
},
222-
sendEvent: (...events) => {
223-
events.forEach((event) => {
224-
eventContextWeakMap.set(event, handlerContext);
225-
handlerContext.outputs.push(event);
226-
queue.push(event);
227-
rootWorkflowContext.__internal__call_send_event.publish(
228-
event,
229-
handlerContext,
230-
);
231-
queueUpdateCallback(handlerContext);
232-
});
233-
},
234-
__internal__call_context: createSubscribable(),
235-
__internal__call_send_event: createSubscribable(),
236-
});
199+
): WorkflowContext => {
200+
let lazyLoadStream: WorkflowStream | null = null;
201+
return {
202+
get stream() {
203+
if (!lazyLoadStream) {
204+
const subscribable = createSubscribable<
205+
[event: WorkflowEventData<any>],
206+
void
207+
>();
208+
rootWorkflowContext.__internal__call_send_event.subscribe(
209+
(newEvent: WorkflowEventData<any>) => {
210+
let currentEventContext = eventContextWeakMap.get(newEvent);
211+
while (currentEventContext) {
212+
if (currentEventContext === handlerContext) {
213+
subscribable.publish(newEvent);
214+
break;
215+
}
216+
currentEventContext = currentEventContext.prev;
217+
}
218+
},
219+
);
220+
lazyLoadStream = new WorkflowStream(subscribable, null);
221+
}
222+
return lazyLoadStream;
223+
},
224+
get signal() {
225+
return handlerContext.abortController.signal;
226+
},
227+
sendEvent: (...events) => {
228+
events.forEach((event) => {
229+
eventContextWeakMap.set(event, handlerContext);
230+
handlerContext.outputs.push(event);
231+
queue.push(event);
232+
rootWorkflowContext.__internal__call_send_event.publish(
233+
event,
234+
handlerContext,
235+
);
236+
queueUpdateCallback(handlerContext);
237+
});
238+
},
239+
__internal__call_context: createSubscribable(),
240+
__internal__call_send_event: createSubscribable(),
241+
};
242+
};
237243

238244
let rootAbortController = new AbortController();
239245
const handlerRootContext: HandlerContext = {

packages/core/src/core/stream.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ export class WorkflowStream<R = any>
7676
#stream: ReadableStream<R>;
7777
#subscribable: Subscribable<[data: R], void>;
7878

79-
on(
80-
event: WorkflowEvent<any>,
81-
handler: (event: WorkflowEventData<any>) => void,
79+
on<T>(
80+
event: WorkflowEvent<T>,
81+
handler: (event: WorkflowEventData<T>) => void,
8282
): () => void {
8383
return this.#subscribable.subscribe((ev) => {
8484
if (event.include(ev)) {

packages/core/src/core/utils.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ export function flattenEvents(
2929

3030
export type Subscribable<Args extends any[], R> = {
3131
subscribe: (callback: (...args: Args) => R) => () => void;
32-
publish: (...args: Args) => void;
32+
publish: (...args: Args) => unknown[];
3333
};
3434

3535
const __internal__subscribesSourcemap = new WeakMap<
@@ -49,24 +49,30 @@ export function getSubscribers<Args extends any[], R>(
4949
/**
5050
* @internal
5151
*/
52-
export function createSubscribable<Args extends any[], R>(): Subscribable<
53-
Args,
54-
R
55-
> {
56-
const subscribers = new Set<(...args: Args) => R>();
52+
export function createSubscribable<
53+
FnOrArgs extends ((...args: any[]) => any) | any[],
54+
R = unknown,
55+
>(): FnOrArgs extends (...args: any[]) => any
56+
? Subscribable<Parameters<FnOrArgs>, ReturnType<FnOrArgs>>
57+
: FnOrArgs extends any[]
58+
? Subscribable<FnOrArgs, R>
59+
: never {
60+
const subscribers = new Set<(...args: any) => any>();
5761
const obj = {
58-
subscribe: (callback: (...args: Args) => R) => {
62+
subscribe: (callback: (...args: any) => any) => {
5963
subscribers.add(callback);
6064
return () => {
6165
subscribers.delete(callback);
6266
};
6367
},
64-
publish: (...args: Args) => {
68+
publish: (...args: any) => {
69+
const results: unknown[] = [];
6570
for (const callback of subscribers) {
66-
callback(...args);
71+
results.push(callback(...args));
6772
}
73+
return results;
6874
},
6975
};
7076
__internal__subscribesSourcemap.set(obj, subscribers);
71-
return obj;
77+
return obj as any;
7278
}

0 commit comments

Comments
 (0)