Skip to content

Commit 72fc069

Browse files
committed
Implement streaming for AI completions
Update system prompt with requirements and context formatting. Add eventStream async generator for client-side SSE parsing. Add ctx param to fetch handler for waitUntil caching.
1 parent 7c22b58 commit 72fc069

2 files changed

Lines changed: 52 additions & 9 deletions

File tree

packages/webapp/cf/index.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ const resInit = {
6868
};
6969

7070
export default {
71-
async fetch(req, env, _ctx): Promise<Response> {
71+
async fetch(req, env, ctx): Promise<Response> {
7272
if (req.method === 'OPTIONS') {
7373
return new Response(null, {
7474
status: 204,
@@ -129,26 +129,43 @@ export default {
129129
}
130130
case '/completions': {
131131
const cacheRes = await env.KV.get(req.url);
132-
if (cacheRes) return new Response(cacheRes, resInit);
133-
132+
if (cacheRes) {
133+
return new Response(cacheRes, {
134+
headers: { ...resInit.headers, 'Content-Type': 'text/event-stream' },
135+
});
136+
}
134137
const q = params.get('q') || '';
135138
const [values] = await embedding(env.AI, [q]);
136139
const res = await env.GAMES_SEARCH.query(values, { returnMetadata: true });
137140
const messages = [
138141
{
139142
role: 'system',
140-
content: `You are an application assistant and need to answer user questions in the target language(${req.headers.get('Accept-Language') ?? 'en'}) according to the following requirements::
141-
${res.matches.map((e) => (e.metadata as any).text).join('\n\n\n')}`,
143+
content: `You are an application assistant.
144+
Response user input based on the context and your existing knowledge.
145+
146+
Requirements:
147+
1. Language: respond in (${req.headers.get('Accept-Language') ?? 'en'}).
148+
2. Maximum 1000 words.
149+
150+
Context:
151+
${res.matches.map((e) => (e.metadata as any).text).join('\n\n---\n\n')}
152+
`,
142153
},
143154
{
144155
role: 'user',
145156
content: params.get('q') || '',
146157
},
147158
];
148-
const result = await env.AI.run('@cf/nvidia/nemotron-3-120b-a12b' as any, { messages });
149-
const body = { content: result?.choices?.at?.(0)?.message?.content };
150-
await env.KV.put(req.url, JSON.stringify(body), { expirationTtl: 60 * 60 });
151-
return Response.json(body, resInit);
159+
const stream = await env.AI.run('@cf/nvidia/nemotron-3-120b-a12b' as any, { messages, stream: true });
160+
const [stream1, stream2] = stream.tee();
161+
162+
ctx.waitUntil(
163+
new Response(stream1).text().then((text) => env.KV.put(req.url, text, { expirationTtl: 60 * 60 })),
164+
);
165+
166+
return new Response(stream2, {
167+
headers: { ...resInit.headers, 'Content-Type': 'text/event-stream' },
168+
});
152169
}
153170
default: {
154171
return new Response('Hello World!');

packages/webapp/src/services/index.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,3 +150,29 @@ export function subscribe<Result, InputVar = Record<string, any>>(
150150
},
151151
};
152152
}
153+
154+
export async function* eventStream(input: string | URL | Request, init?: RequestInit) {
155+
const res = await fetch(input, init);
156+
if (!res.body) throw new Error('invalid method');
157+
158+
const reader = res.body.getReader();
159+
const decoder = new TextDecoder();
160+
161+
let buffer = '';
162+
163+
while (true) {
164+
const { done, value } = await reader.read();
165+
if (done) return;
166+
167+
buffer += decoder.decode(value, { stream: true });
168+
const lines = buffer.split('\n');
169+
buffer = lines.pop() || '';
170+
171+
for (const line of lines) {
172+
if (!line.startsWith('data: ')) continue;
173+
const data = line.slice(6);
174+
if (data === '[DONE]') return;
175+
yield JSON.parse(data);
176+
}
177+
}
178+
}

0 commit comments

Comments
 (0)