Skip to content

Commit 6d1f5de

Browse files
aadereikoaadereiko
andauthored
[OPIK-3515] [FE]: fix playground streaming with responses of unfinished lines; (#4488)
* [OPIK-3515]: fix playground streaming with responses of unfinished lines; * fix eslint issues; --------- Co-authored-by: aadereiko <[email protected]>
1 parent ff9b730 commit 6d1f5de

File tree

2 files changed

+265
-1
lines changed

2 files changed

+265
-1
lines changed
Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
import { describe, expect, it } from "vitest";
2+
import { processSSEChunk } from "./useCompletionProxyStreaming";
3+
4+
describe("processSSEChunk", () => {
5+
describe("basic line processing", () => {
6+
it("should process a single complete line", () => {
7+
const result = processSSEChunk(
8+
'data: {"choices":[{"delta":{"content":"hello"}}]}\n',
9+
"",
10+
);
11+
expect(result.lines).toEqual([
12+
'data: {"choices":[{"delta":{"content":"hello"}}]}',
13+
]);
14+
expect(result.newBuffer).toBe("");
15+
});
16+
17+
it("should process multiple complete lines", () => {
18+
const result = processSSEChunk(
19+
'data: {"choices":[{"delta":{"content":"hello"}}]}\ndata: {"choices":[{"delta":{"content":" world"}}]}\n',
20+
"",
21+
);
22+
expect(result.lines).toEqual([
23+
'data: {"choices":[{"delta":{"content":"hello"}}]}',
24+
'data: {"choices":[{"delta":{"content":" world"}}]}',
25+
]);
26+
expect(result.newBuffer).toBe("");
27+
});
28+
29+
it("should skip empty lines", () => {
30+
const result = processSSEChunk(
31+
'data: {"content":"a"}\n\n\ndata: {"content":"b"}\n',
32+
"",
33+
);
34+
expect(result.lines).toEqual([
35+
'data: {"content":"a"}',
36+
'data: {"content":"b"}',
37+
]);
38+
expect(result.newBuffer).toBe("");
39+
});
40+
});
41+
42+
describe("buffering incomplete lines", () => {
43+
it("should buffer an incomplete line (no trailing newline)", () => {
44+
const result = processSSEChunk(
45+
'data: {"choices":[{"delta":{"content":"hello"}}]}\ndata: {"choi',
46+
"",
47+
);
48+
expect(result.lines).toEqual([
49+
'data: {"choices":[{"delta":{"content":"hello"}}]}',
50+
]);
51+
expect(result.newBuffer).toBe('data: {"choi');
52+
});
53+
54+
it("should prepend buffer to next chunk and complete the line", () => {
55+
// First chunk ends with incomplete line
56+
const result1 = processSSEChunk(
57+
'data: {"choices":[{"delta":{"content":"hello"}}]}\ndata: {"choi',
58+
"",
59+
);
60+
expect(result1.newBuffer).toBe('data: {"choi');
61+
62+
// Second chunk completes the line
63+
const result2 = processSSEChunk(
64+
'ces":[{"delta":{"content":","}}]}\n',
65+
result1.newBuffer,
66+
);
67+
expect(result2.lines).toEqual([
68+
'data: {"choices":[{"delta":{"content":","}}]}',
69+
]);
70+
expect(result2.newBuffer).toBe("");
71+
});
72+
73+
it("should handle line split across multiple chunks", () => {
74+
// Simulate a JSON line split across 3 chunks
75+
const result1 = processSSEChunk('data: {"choices":[{"del', "");
76+
expect(result1.lines).toEqual([]);
77+
expect(result1.newBuffer).toBe('data: {"choices":[{"del');
78+
79+
const result2 = processSSEChunk(
80+
'ta":{"content":"test',
81+
result1.newBuffer,
82+
);
83+
expect(result2.lines).toEqual([]);
84+
expect(result2.newBuffer).toBe(
85+
'data: {"choices":[{"delta":{"content":"test',
86+
);
87+
88+
const result3 = processSSEChunk('"}}]}\n', result2.newBuffer);
89+
expect(result3.lines).toEqual([
90+
'data: {"choices":[{"delta":{"content":"test"}}]}',
91+
]);
92+
expect(result3.newBuffer).toBe("");
93+
});
94+
});
95+
96+
describe("edge cases", () => {
97+
it("should handle empty chunk with buffer", () => {
98+
const result = processSSEChunk("", 'data: {"partial":');
99+
expect(result.lines).toEqual([]);
100+
expect(result.newBuffer).toBe('data: {"partial":');
101+
});
102+
103+
it("should handle chunk that is only newlines", () => {
104+
const result = processSSEChunk("\n\n\n", "");
105+
expect(result.lines).toEqual([]);
106+
expect(result.newBuffer).toBe("");
107+
});
108+
109+
it("should handle buffer that becomes complete with newline chunk", () => {
110+
const result = processSSEChunk(
111+
"\n",
112+
'data: {"choices":[{"delta":{"content":"x"}}]}',
113+
);
114+
expect(result.lines).toEqual([
115+
'data: {"choices":[{"delta":{"content":"x"}}]}',
116+
]);
117+
expect(result.newBuffer).toBe("");
118+
});
119+
120+
it("should handle whitespace-only lines (should be filtered)", () => {
121+
const result = processSSEChunk(" \n\t\n", "");
122+
expect(result.lines).toEqual([]);
123+
expect(result.newBuffer).toBe("");
124+
});
125+
126+
it("should handle chunk ending exactly at newline", () => {
127+
const result = processSSEChunk('data: {"content":"a"}\n', "");
128+
expect(result.lines).toEqual(['data: {"content":"a"}']);
129+
expect(result.newBuffer).toBe("");
130+
});
131+
});
132+
133+
describe("real-world scenarios", () => {
134+
it("should preserve content when SSE message is split mid-JSON", () => {
135+
const chunk1 =
136+
'data: {"choices":[{"delta":{"content":"hello"}}]}\ndata: {"choi';
137+
const chunk2 = 'ces":[{"delta":{"content":","}}]}\n';
138+
139+
const result1 = processSSEChunk(chunk1, "");
140+
expect(result1.lines).toEqual([
141+
'data: {"choices":[{"delta":{"content":"hello"}}]}',
142+
]);
143+
expect(result1.newBuffer).toBe('data: {"choi');
144+
145+
const result2 = processSSEChunk(chunk2, result1.newBuffer);
146+
expect(result2.lines).toEqual([
147+
'data: {"choices":[{"delta":{"content":","}}]}',
148+
]);
149+
expect(result2.newBuffer).toBe("");
150+
});
151+
152+
it("should handle streaming with usage info at the end", () => {
153+
const chunks = [
154+
'data: {"choices":[{"delta":{"content":"Hi"}}]}\ndata: {"choi',
155+
'ces":[{"delta":{"content":"!"}}]}\n',
156+
'data: {"choices":[],"usage":{"prompt_tokens":10,"completion_tokens":2}}\n',
157+
];
158+
159+
let buffer = "";
160+
const allLines: string[] = [];
161+
162+
for (const chunk of chunks) {
163+
const result = processSSEChunk(chunk, buffer);
164+
allLines.push(...result.lines);
165+
buffer = result.newBuffer;
166+
}
167+
168+
expect(allLines).toEqual([
169+
'data: {"choices":[{"delta":{"content":"Hi"}}]}',
170+
'data: {"choices":[{"delta":{"content":"!"}}]}',
171+
'data: {"choices":[],"usage":{"prompt_tokens":10,"completion_tokens":2}}',
172+
]);
173+
expect(buffer).toBe("");
174+
});
175+
176+
it("should handle [DONE] signal", () => {
177+
const result = processSSEChunk("data: [DONE]\n", "");
178+
expect(result.lines).toEqual(["data: [DONE]"]);
179+
expect(result.newBuffer).toBe("");
180+
});
181+
182+
it("should preserve comma in JSON schema output - qwen model scenario", () => {
183+
const chunks = [
184+
'data: {"choices":[{"delta":{"content":"[\\"wedding surprise reactions|0.86\\""}}]}\n',
185+
'data: {"choices":[{"delta":{"content":","}}]}\ndata: {"ch',
186+
'oices":[{"delta":{"content":" \\"beach sunset|0.92\\"]"}}]}\n',
187+
];
188+
189+
let buffer = "";
190+
const allLines: string[] = [];
191+
192+
for (const chunk of chunks) {
193+
const result = processSSEChunk(chunk, buffer);
194+
allLines.push(...result.lines);
195+
buffer = result.newBuffer;
196+
}
197+
198+
expect(allLines).toHaveLength(3);
199+
expect(allLines[0]).toContain("wedding surprise reactions|0.86");
200+
expect(allLines[1]).toContain(",");
201+
expect(allLines[2]).toContain("beach sunset|0.92");
202+
expect(buffer).toBe("");
203+
});
204+
205+
it("should handle JSON with pipe-separated values split across many chunks", () => {
206+
const chunks = [
207+
'data: {"choices":[{"delta":{"content":"wedding surprise reactions|0.86"}}]}\n',
208+
'data: {"choices":[{"delta":{"content":","}}]}\ndata: {"choices":[{"del',
209+
'ta":{"content":" beach|0.75"}}]}\n',
210+
];
211+
212+
let buffer = "";
213+
const allLines: string[] = [];
214+
215+
for (const chunk of chunks) {
216+
const result = processSSEChunk(chunk, buffer);
217+
allLines.push(...result.lines);
218+
buffer = result.newBuffer;
219+
}
220+
221+
expect(allLines).toHaveLength(3);
222+
expect(allLines[0]).toContain("wedding surprise reactions|0.86");
223+
expect(allLines[1]).toContain(",");
224+
expect(allLines[2]).toContain("beach|0.75");
225+
expect(buffer).toBe("");
226+
227+
const contentParts = allLines.map((line) => {
228+
const match = line.match(/"content":"([^"]*)"/);
229+
return match ? match[1] : "";
230+
});
231+
const fullContent = contentParts.join("");
232+
233+
expect(fullContent).toBe("wedding surprise reactions|0.86, beach|0.75");
234+
});
235+
});
236+
});

apps/opik-frontend/src/api/playground/useCompletionProxyStreaming.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,30 @@ import { ProviderMessageType } from "@/types/llm";
1818

1919
const DATA_PREFIX = "data:";
2020

21+
/**
22+
* Processes SSE chunk data with buffering for incomplete lines.
23+
* SSE messages can be split across network chunks, so we buffer incomplete
24+
* lines and only process complete lines (those ending with newline).
25+
*/
26+
export const processSSEChunk = (
27+
chunk: string,
28+
buffer: string,
29+
): { lines: string[]; newBuffer: string } => {
30+
const data = buffer + chunk;
31+
const lines = data.split("\n");
32+
33+
// if the data doesn't end with newline, the last element is incomplete
34+
// save it for the next iteration
35+
let newBuffer = "";
36+
if (!data.endsWith("\n")) {
37+
newBuffer = lines.pop() || "";
38+
}
39+
40+
const completeLines = lines.filter((line) => line.trim() !== "");
41+
42+
return { lines: completeLines, newBuffer };
43+
};
44+
2145
const getNowUtcTimeISOString = (): string => {
2246
return dayjs().utc().toISOString();
2347
};
@@ -192,6 +216,9 @@ const useCompletionProxyStreaming = ({
192216
}
193217
};
194218

219+
// buffer to hold incomplete lines across chunks
220+
let lineBuffer = "";
221+
195222
// an analogue of true && reader
196223
// we need it to wait till the stream is closed
197224
while (reader) {
@@ -202,7 +229,8 @@ const useCompletionProxyStreaming = ({
202229
}
203230

204231
const chunk = decoder.decode(value, { stream: true });
205-
const lines = chunk.split("\n").filter((line) => line.trim() !== "");
232+
const { lines, newBuffer } = processSSEChunk(chunk, lineBuffer);
233+
lineBuffer = newBuffer;
206234

207235
for (const line of lines) {
208236
const JSONData = line.startsWith(DATA_PREFIX)

0 commit comments

Comments
 (0)