Skip to content

Commit 3c8d1b4

Browse files
authored
Merge pull request #18961 from Budibase/fix/channel-drift
fix: RAG drift within Teams / Slack channels vs preview chat
2 parents 5c80f9b + f8d4912 commit 3c8d1b4

2 files changed

Lines changed: 330 additions & 91 deletions

File tree

packages/server/src/api/controllers/ai/chatConversations.ts

Lines changed: 78 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,10 @@ import {
2323
} from "@budibase/types"
2424
import {
2525
consumeStream,
26+
readUIMessageStream,
2627
type LanguageModelUsage,
27-
type StreamTextResult,
28-
type ToolSet,
28+
type UIMessage,
29+
type UIMessageChunk,
2930
} from "ai"
3031
import sdk from "../../../sdk"
3132
import {
@@ -270,10 +271,35 @@ const resolveChatStreamRequest = async (
270271
}
271272
}
272273

273-
export type WebhookAssistantStream = StreamTextResult<
274-
ToolSet,
275-
never
276-
>["fullStream"]
274+
export type WebhookAssistantStream = AsyncIterable<string>
275+
276+
const getAssistantMessageText = (assistantMessage?: UIMessage) =>
277+
assistantMessage?.parts
278+
?.flatMap(part => (part.type === "text" ? [part.text] : []))
279+
.join("") || ""
280+
281+
const createAssistantTextStream = async function* (
282+
stream: ReadableStream<UIMessageChunk>
283+
): AsyncGenerator<string, void, void> {
284+
let previousText = ""
285+
for await (const assistantMessage of readUIMessageStream({ stream })) {
286+
const currentText = getAssistantMessageText(assistantMessage)
287+
if (!currentText || currentText === previousText) {
288+
continue
289+
}
290+
291+
if (currentText.startsWith(previousText)) {
292+
const delta = currentText.slice(previousText.length)
293+
if (delta) {
294+
yield delta
295+
}
296+
} else {
297+
yield currentText
298+
}
299+
300+
previousText = currentText
301+
}
302+
}
277303

278304
export async function webhookChat({
279305
chat,
@@ -320,15 +346,37 @@ export async function webhookChat({
320346

321347
const result = await run.stream()
322348

323-
const streamTask = onAssistantStream
324-
? onAssistantStream(result.fullStream)
325-
: Promise.resolve()
349+
const uiMessageStream = result.toUIMessageStream({
350+
generateMessageId: v4,
351+
sendReasoning: true,
352+
})
353+
let assistantStreamForCapture: ReadableStream<UIMessageChunk> =
354+
uiMessageStream
355+
let streamTask: Promise<void> = Promise.resolve()
356+
if (onAssistantStream) {
357+
const [deliveryStream, captureStream] = uiMessageStream.tee()
358+
assistantStreamForCapture = captureStream
359+
streamTask = onAssistantStream(createAssistantTextStream(deliveryStream))
360+
}
361+
362+
const assistantMessageTask = (async () => {
363+
let assistantMessage: ChatConversation["messages"][number] | undefined
364+
for await (const message of readUIMessageStream<
365+
ChatConversation["messages"][number]
366+
>({
367+
stream: assistantStreamForCapture,
368+
})) {
369+
assistantMessage = message
370+
}
371+
return assistantMessage
372+
})()
326373

327-
const [textResult, responseResult, streamOutcome] = await Promise.allSettled([
328-
result.text,
329-
result.response,
330-
streamTask,
331-
])
374+
const [assistantMessageResult, responseResult, streamOutcome] =
375+
await Promise.allSettled([
376+
assistantMessageTask,
377+
result.response,
378+
streamTask,
379+
])
332380

333381
if (streamOutcome.status === "rejected") {
334382
console.error("Chat webhook stream delivery failed", streamOutcome.reason)
@@ -346,19 +394,19 @@ export async function webhookChat({
346394
run.sessionLogIndexer.addRequestId(requestId)
347395
await run.sessionLogIndexer.index()
348396

349-
if (textResult.status === "rejected") {
397+
if (assistantMessageResult.status === "rejected") {
350398
console.error("Agent streaming error", {
351399
agentId,
352400
chatAppId,
353401
sessionId,
354-
error: textResult.reason,
402+
error: assistantMessageResult.reason,
355403
})
356404
events.action.aiAgentFailed({
357405
agentId,
358406
reason: ActionFailureReason.ERROR,
359-
errorMessage: getErrorMessage(textResult.reason),
407+
errorMessage: getErrorMessage(assistantMessageResult.reason),
360408
})
361-
throw textResult.reason
409+
throw assistantMessageResult.reason
362410
}
363411
if (responseResult.status === "rejected") {
364412
console.error("Agent response metadata error", {
@@ -376,12 +424,18 @@ export async function webhookChat({
376424
}
377425

378426
events.action.aiAgentExecuted({ agentId })
379-
380-
const assistantText = textResult.value
427+
const ragSources = run.getUsedKnowledgeSourcesMetadata()
428+
429+
const finalAssistantMessage =
430+
assistantMessageResult.value ||
431+
({
432+
id: v4(),
433+
role: "assistant",
434+
parts: [{ type: "text", text: "" }],
435+
} satisfies ChatConversation["messages"][number])
436+
const assistantText = getAssistantMessageText(finalAssistantMessage)
381437
const assistantMessage: ChatConversation["messages"][number] = {
382-
id: v4(),
383-
role: "assistant",
384-
parts: [{ type: "text", text: assistantText || "" }],
438+
...finalAssistantMessage,
385439
}
386440

387441
return {
@@ -391,6 +445,7 @@ export async function webhookChat({
391445
allowKnowledgeSourceDownload:
392446
run.selectedOperation?.allowKnowledgeSourceDownload,
393447
title,
448+
...(ragSources?.length ? { ragSources } : {}),
394449
}
395450
}
396451

0 commit comments

Comments
 (0)