Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dist/build/static/js/bundle.min.js

Large diffs are not rendered by default.

204 changes: 134 additions & 70 deletions src/chatWidget/chatWindow/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import React, { useEffect, useMemo, useRef, useState } from "react";
import { ChatMessageType } from "../../types/chatWidget";
import ChatMessage from "./chatMessage";
import { sendMessage } from "../../controllers";
import { sendMessageStreaming } from "../../controllers/streamMessage";
import ChatMessagePlaceholder from "../../chatPlaceholder";

export default function ChatWindow({
Expand Down Expand Up @@ -37,7 +38,8 @@ export default function ChatWindow({
height = 650,
tweaks,
sessionId,
additional_headers
additional_headers,
stream
}: {
api_key?: string;
output_type: string,
Expand Down Expand Up @@ -70,6 +72,7 @@ export default function ChatWindow({
height?: number;
sessionId: React.MutableRefObject<string>;
additional_headers?: { [key: string]: string } | string;
stream?: boolean;

}) {
const [value, setValue] = useState<string>("");
Expand All @@ -92,87 +95,148 @@ export default function ChatWindow({
/* Initial listener for loss of focus that refocuses User input after a small delay */

const [sendingMessage, setSendingMessage] = useState(false);
const [isStreaming, setIsStreaming] = useState(false);
const abortRef = useRef<AbortController | null>(null);

useEffect(() => {
return () => { abortRef.current?.abort(); };
}, []);

// Ensure additional_headers is always an object
const parsedHeaders = useMemo(() => parseAdditionalHeaders(additional_headers), [additional_headers]);

function handleClick() {
if (value && value.trim() !== "") {
addMessage({ message: value, isSend: true });
const currentValue = value;
addMessage({ message: currentValue, isSend: true });
setSendingMessage(true);
setValue("");
sendMessage(hostUrl, flowId, value, input_type, output_type, sessionId, output_component, tweaks, api_key, parsedHeaders)
.then((res) => {
if (
res.data &&
res.data.outputs &&
Object.keys(res.data.outputs).length > 0 &&
res.data.outputs[0].outputs && res.data.outputs[0].outputs.length > 0
) {
const flowOutputs: Array<any> = res.data.outputs[0].outputs;
if (output_component &&
flowOutputs.map(e => e.component_id).includes(output_component)) {
Object.values(flowOutputs.find(e => e.component_id === output_component).outputs).forEach((output: any) => {
addMessage({
message: extractMessageFromOutput(output),
isSend: false,
});
})
} else if (
flowOutputs.length === 1

if (stream) {
// Streaming path
abortRef.current?.abort();
const controller = new AbortController();
abortRef.current = controller;

addMessage({ message: "", isSend: false });
setIsStreaming(true);
const accumulated = { current: "" };
let rafId: number | null = null;

const updateMessage = () => {
updateLastMessage({ message: accumulated.current, isSend: false });
rafId = null;
};

sendMessageStreaming(
hostUrl, flowId, currentValue, input_type, output_type,
sessionId.current,
{
onToken: (chunk) => {
accumulated.current += chunk;
if (!rafId) rafId = requestAnimationFrame(updateMessage);
},
onEnd: (data) => {
if (rafId) cancelAnimationFrame(rafId);
// Final update with accumulated text
if (accumulated.current) {
updateLastMessage({ message: accumulated.current, isSend: false });
}
// Capture session_id from end event result
if (data && data.session_id) {
sessionId.current = data.session_id;
}
setIsStreaming(false);
setSendingMessage(false);
},
onError: (error) => {
if (rafId) cancelAnimationFrame(rafId);
updateLastMessage({
message: accumulated.current || error,
isSend: false,
error: true,
});
setIsStreaming(false);
setSendingMessage(false);
},
},
output_component, tweaks, api_key, parsedHeaders as { [key: string]: string } | undefined,
controller.signal,
);
} else {
// Non-streaming path (existing behavior)
sendMessage(hostUrl, flowId, currentValue, input_type, output_type, sessionId, output_component, tweaks, api_key, parsedHeaders)
.then((res) => {
if (
res.data &&
res.data.outputs &&
Object.keys(res.data.outputs).length > 0 &&
res.data.outputs[0].outputs && res.data.outputs[0].outputs.length > 0
) {
Object.values(flowOutputs[0].outputs).forEach((output: any) => {
addMessage({
message: extractMessageFromOutput(output),
isSend: false,
});
})
} else {
flowOutputs
.sort((a, b) => {
// Get the earliest timestamp from each flowOutput's outputs
const aTimestamp = Math.min(...Object.values(a.outputs).map((output: any) => Date.parse(output.message?.timestamp)));
const bTimestamp = Math.min(...Object.values(b.outputs).map((output: any) => Date.parse(output.message?.timestamp)));
return aTimestamp - bTimestamp; // Sort descending (newest first)
const flowOutputs: Array<any> = res.data.outputs[0].outputs;
if (output_component &&
flowOutputs.map(e => e.component_id).includes(output_component)) {
Object.values(flowOutputs.find(e => e.component_id === output_component).outputs).forEach((output: any) => {
addMessage({
message: extractMessageFromOutput(output),
isSend: false,
});
})
} else if (
flowOutputs.length === 1
) {
Object.values(flowOutputs[0].outputs).forEach((output: any) => {
addMessage({
message: extractMessageFromOutput(output),
isSend: false,
});
})
.forEach((flowOutput) => {
Object.values(flowOutput.outputs).forEach((output: any) => {
addMessage({
message: extractMessageFromOutput(output),
isSend: false,
} else {
flowOutputs
.sort((a, b) => {
const aTimestamp = Math.min(...Object.values(a.outputs).map((output: any) => Date.parse(output.message?.timestamp)));
const bTimestamp = Math.min(...Object.values(b.outputs).map((output: any) => Date.parse(output.message?.timestamp)));
return aTimestamp - bTimestamp;
})
.forEach((flowOutput) => {
Object.values(flowOutput.outputs).forEach((output: any) => {
addMessage({
message: extractMessageFromOutput(output),
isSend: false,
});
});
});
});
}
}
if (res.data && res.data.session_id) {
sessionId.current = res.data.session_id;
}
}
if (res.data && res.data.session_id) {
sessionId.current = res.data.session_id;
}
setSendingMessage(false);
})
.catch((err) => {
const response = err.response;
if (err.code === "ERR_NETWORK") {
updateLastMessage({
message: "Network error",
isSend: false,
error: true,
});
} else if (
response &&
response.status === 500 &&
response.data &&
response.data.detail
) {
updateLastMessage({
message: response.data.detail,
isSend: false,
error: true,
});
}
console.error(err);
setSendingMessage(false);
});
setSendingMessage(false);
})
.catch((err) => {
const response = err.response;
if (err.code === "ERR_NETWORK") {
updateLastMessage({
message: "Network error",
isSend: false,
error: true,
});
} else if (
response &&
response.status === 500 &&
response.data &&
response.data.detail
) {
updateLastMessage({
message: response.data.detail,
isSend: false,
error: true,
});
}
console.error(err);
setSendingMessage(false);
});
}
}
}

Expand Down Expand Up @@ -232,7 +296,7 @@ export default function ChatWindow({
error={message.error}
/>
))}
{sendingMessage && (
{sendingMessage && !isStreaming && (
<ChatMessagePlaceholder bot_message_style={bot_message_style} />
)}
<div ref={lastMessage}></div>
Expand Down
3 changes: 3 additions & 0 deletions src/chatWidget/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export default function ChatWidget({
input_container_style,
additional_headers,
session_id,
stream,
start_open=false,
}: {
api_key?: string;
Expand Down Expand Up @@ -63,6 +64,7 @@ export default function ChatWidget({
tweaks?: { [key: string]: any };
additional_headers?: { [key: string]: string } | string;
session_id?: string;
stream?: boolean;
start_open?: boolean;
}) {
const [open, setOpen] = useState(start_open);
Expand Down Expand Up @@ -2184,6 +2186,7 @@ input::-ms-input-placeholder { /* Microsoft Edge */
position={chat_position}
sessionId={sessionId}
additional_headers={parsedAdditionalHeaders}
stream={stream}
/>
</div>
);
Expand Down
105 changes: 105 additions & 0 deletions src/controllers/streamMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
export interface StreamCallbacks {
onToken: (chunk: string) => void;
onEnd: (data: any) => void;
onError: (error: string) => void;
}

export async function sendMessageStreaming(
baseUrl: string,
flowId: string,
message: string,
input_type: string,
output_type: string,
sessionId: string,
callbacks: StreamCallbacks,
output_component?: string,
tweaks?: Object,
api_key?: string,
additional_headers?: { [key: string]: string },
signal?: AbortSignal,
): Promise<void> {
const data: any = { input_type, input_value: message, output_type };
if (tweaks) data.tweaks = tweaks;
if (output_component) data.output_component = output_component;
if (sessionId) data.session_id = sessionId;

const headers: { [key: string]: string } = { "Content-Type": "application/json" };
if (api_key) headers["x-api-key"] = api_key;
if (additional_headers) {
Object.keys(additional_headers).forEach((key) => {
headers[key] = String(additional_headers[key]);
});
}

let endOrErrorCalled = false;

try {
const response = await fetch(
`${baseUrl}/api/v1/run/${flowId}?stream=true`,
{ method: "POST", headers, body: JSON.stringify(data), signal }
);

if (!response.ok) {
const errorText = await response.text();
endOrErrorCalled = true;
callbacks.onError(errorText || `HTTP ${response.status}`);
return;
}

if (!response.body) {
endOrErrorCalled = true;
callbacks.onEnd(null);
return;
}

const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer: string[] = [];

while (true) {
const { done, value } = await reader.read();
if (done) break;

const chunk = decoder.decode(value, { stream: true });
const parts = chunk.split("\n\n");

for (const part of parts) {
if (!part.trim()) continue;

const accumulated = buffer.join("") + part;
if (accumulated.endsWith("}")) {
try {
const parsed = JSON.parse(accumulated);
buffer = [];

if (parsed.chunk !== undefined) {
callbacks.onToken(parsed.chunk);
} else if (parsed.result !== undefined) {
endOrErrorCalled = true;
callbacks.onEnd(parsed.result);
} else if (parsed.error !== undefined) {
endOrErrorCalled = true;
callbacks.onError(typeof parsed.error === "string" ? parsed.error : JSON.stringify(parsed.error));
}
} catch {
buffer.push(part);
}
} else {
buffer.push(part);
}
}
}
} catch (err: any) {
if (err.name === "AbortError") {
return;
}
endOrErrorCalled = true;
callbacks.onError(err.message || "Stream connection failed");
return;
}

// Stream completion safety: ensure widget always exits streaming state
if (!endOrErrorCalled) {
callbacks.onEnd(null);
}
}
1 change: 1 addition & 0 deletions src/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ customElements.define('langflow-chat', r2wc(ChatWidget, {
input_container_style:"json",
chat_position:"string",
additional_headers:"json",
stream:"boolean",
},
}));