Skip to content

Commit 459c0a6

Browse files
authored
Merge pull request #10 from BrainbaseHQ/abhinav/bra2-997-chat-embed-streaming
Handle chunk SSE events for token-by-token streaming
2 parents 330c9fe + 585603a commit 459c0a6

3 files changed

Lines changed: 74 additions & 37 deletions

File tree

src/api/client.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import type {
44
SendMessageParams,
55
} from '../types';
66

7-
export const DEFAULT_ENGINE_URL = 'https://chat-embed-deployment.onrender.com';
8-
// export const DEFAULT_ENGINE_URL = 'http://localhost:8003';
7+
//export const DEFAULT_ENGINE_URL = 'https://chat-embed-deployment.onrender.com';
8+
export const DEFAULT_ENGINE_URL = 'http://localhost:8003';
99

1010
export function createAPIClient(
1111
engineBaseUrl: string = DEFAULT_ENGINE_URL

src/hooks/useChat.ts

Lines changed: 71 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ export function useChat(options: UseChatOptions): UseChatReturn {
5555

5656
const sessionStartTime = useRef<number>(0);
5757
const isInitialized = useRef(false);
58+
const streamBuffers = useRef<Record<string, string>>({});
5859

5960
// Initialize or restore session
6061
useEffect(() => {
@@ -99,6 +100,31 @@ export function useChat(options: UseChatOptions): UseChatReturn {
99100
return '';
100101
}, [config.embedId]);
101102

103+
const upsertAssistantMessage = useCallback((messageId: string, content: string) => {
104+
setMessages((prev) => {
105+
let found = false;
106+
const next = prev.map((m) => {
107+
if (m.id === messageId) {
108+
found = true;
109+
return { ...m, content, status: 'streaming' as const };
110+
}
111+
return m;
112+
});
113+
114+
if (!found) {
115+
next.push({
116+
id: messageId,
117+
role: 'assistant',
118+
content,
119+
timestamp: Date.now(),
120+
status: 'streaming',
121+
});
122+
}
123+
124+
return next;
125+
});
126+
}, []);
127+
102128
const handleSSEEvent = useCallback(
103129
(event: SSEEvent, messageId: string, updateSessionId: (id: string) => void) => {
104130
switch (event.type) {
@@ -115,16 +141,22 @@ export function useChat(options: UseChatOptions): UseChatReturn {
115141
break;
116142
}
117143
case 'message': {
118-
// Agent message
144+
// Agent message (complete message, e.g., from say())
119145
const data = event.data as { content: string; role?: string };
120146
if (data.content) {
121-
setMessages((prev) =>
122-
prev.map((m) =>
123-
m.id === messageId
124-
? { ...m, content: data.content, status: 'streaming' as const }
125-
: m
126-
)
127-
);
147+
streamBuffers.current[messageId] = data.content;
148+
upsertAssistantMessage(messageId, data.content);
149+
}
150+
break;
151+
}
152+
case 'chunk': {
153+
// Streaming chunk - append to existing message content
154+
const data = event.data as { content: string };
155+
if (data.content) {
156+
const current = streamBuffers.current[messageId] ?? '';
157+
const next = current + data.content;
158+
streamBuffers.current[messageId] = next;
159+
upsertAssistantMessage(messageId, next);
128160
}
129161
break;
130162
}
@@ -171,13 +203,9 @@ export function useChat(options: UseChatOptions): UseChatReturn {
171203

172204
// If tool call has content, also update the message
173205
if (data.content) {
174-
setMessages((prev) =>
175-
prev.map((m) =>
176-
m.id === messageId
177-
? { ...m, content: data.content as string, status: 'streaming' as const }
178-
: m
179-
)
180-
);
206+
const content = data.content as string;
207+
streamBuffers.current[messageId] = content;
208+
upsertAssistantMessage(messageId, content);
181209
}
182210
break;
183211
}
@@ -190,26 +218,31 @@ export function useChat(options: UseChatOptions): UseChatReturn {
190218
setMessages((prev) =>
191219
prev.map((m) => (m.id === messageId ? { ...m, status: 'sent' as const } : m))
192220
);
221+
delete streamBuffers.current[messageId];
193222
break;
194223
}
195224
case 'completed': {
196225
// Conversation ended by agent
197-
setMessages((prev) =>
198-
prev.map((m) => (m.id === messageId ? { ...m, status: 'sent' as const } : m))
199-
);
200-
// Mark session as completed
201-
if (sessionId) {
202-
storeSession(config.embedId, {
203-
sessionId,
204-
deploymentId: config.deploymentId,
205-
workerId: config.workerId,
206-
flowId: config.flowId,
207-
startTime: sessionStartTime.current,
208-
messages,
209-
toolCalls,
210-
status: 'completed',
211-
});
212-
}
226+
setMessages((prev) => {
227+
const updatedMessages = prev.map((m) =>
228+
m.id === messageId ? { ...m, status: 'sent' as const } : m
229+
);
230+
// Mark session as completed with current messages
231+
if (sessionId) {
232+
storeSession(config.embedId, {
233+
sessionId,
234+
deploymentId: config.deploymentId,
235+
workerId: config.workerId,
236+
flowId: config.flowId,
237+
startTime: sessionStartTime.current,
238+
messages: updatedMessages,
239+
toolCalls: [], // Tool calls stored separately
240+
status: 'completed',
241+
});
242+
}
243+
return updatedMessages;
244+
});
245+
delete streamBuffers.current[messageId];
213246
break;
214247
}
215248
case 'error': {
@@ -229,7 +262,7 @@ export function useChat(options: UseChatOptions): UseChatReturn {
229262
}
230263
}
231264
},
232-
[config, sessionId, messages, toolCalls, onSessionStart]
265+
[config, sessionId, onSessionStart, upsertAssistantMessage]
233266
);
234267

235268
const processSSEStream = useCallback(
@@ -246,9 +279,12 @@ export function useChat(options: UseChatOptions): UseChatReturn {
246279
buffer += decoder.decode(value, { stream: true });
247280

248281
// SSE messages are separated by double newlines
249-
while (buffer.includes('\n\n')) {
250-
const [message, rest] = buffer.split('\n\n', 2);
251-
buffer = rest;
282+
// Note: We use indexOf instead of split with limit because split(str, 2)
283+
// only returns 2 elements and discards the rest, causing dropped chunks
284+
let idx: number;
285+
while ((idx = buffer.indexOf('\n\n')) !== -1) {
286+
const message = buffer.slice(0, idx);
287+
buffer = buffer.slice(idx + 2);
252288

253289
// Parse SSE data lines
254290
for (const line of message.split('\n')) {

src/types/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export interface ToolCall {
3636
export type SSEEventType =
3737
| 'session' // Session info (session_id, is_new)
3838
| 'message' // Agent text response (content, role)
39+
| 'chunk' // Streaming chunk (token-by-token content)
3940
| 'tool_call' // Tool/function execution
4041
| 'waiting' // Agent is processing
4142
| 'done' // Stream complete

0 commit comments

Comments
 (0)