Skip to content

Commit 96cd38f

Browse files
committed
fix(cz-git): add tests and support for non-stream OpenAI chat completion responses
1 parent 9e5b3b5 commit 96cd38f

2 files changed

Lines changed: 127 additions & 15 deletions

File tree

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { Readable } from 'node:stream'
2+
import { describe, expect, it } from 'vitest'
3+
import { readChatCompletionStreamToSubjects } from '../src/shared/utils/stream'
4+
5+
function asStream(body: string): NodeJS.ReadableStream {
6+
return Readable.from([body])
7+
}
8+
9+
describe('readChatCompletionStreamToSubjects', () => {
10+
it('parses non-stream JSON when no SSE choice deltas appear', async () => {
11+
const json = JSON.stringify({
12+
choices: [{ index: 0, message: { role: 'assistant', content: 'fix login redirect' } }],
13+
})
14+
const subjects = await readChatCompletionStreamToSubjects(asStream(json), 1)
15+
expect(subjects).toEqual(['fix login redirect'])
16+
})
17+
18+
it('parses SSE data lines as before', async () => {
19+
const sse = [
20+
'data: {"choices":[{"index":0,"delta":{"content":"hello"}}]}',
21+
'',
22+
'data: {"choices":[{"index":0,"delta":{"content":" world"}}]}',
23+
'',
24+
'data: [DONE]',
25+
'',
26+
].join('\n')
27+
const subjects = await readChatCompletionStreamToSubjects(asStream(sse), 1)
28+
expect(subjects).toEqual(['hello world'])
29+
})
30+
31+
it('throws when body has neither SSE choices nor non-stream completion', async () => {
32+
await expect(readChatCompletionStreamToSubjects(asStream('not json'), 1)).rejects.toThrow(
33+
/no streamed choice deltas/,
34+
)
35+
})
36+
37+
it('does not return a single empty subject when stream is empty', async () => {
38+
await expect(readChatCompletionStreamToSubjects(asStream(''), 1)).rejects.toThrow()
39+
})
40+
})

packages/cz-git/src/shared/utils/stream.ts

Lines changed: 87 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
* @license MIT
55
*/
66

7-
import readline from 'node:readline'
7+
import { Buffer } from 'node:buffer'
88
import { Readable } from 'node:stream'
99
import type { ReadableStream as WebReadableStream } from 'node:stream/web'
1010

1111
/**
12-
* Normalize `fetch` response body to a Node.js readable stream for `readline`.
12+
* Normalize `fetch` response body to a Node.js readable stream.
1313
*/
1414
export function bodyToNodeReadable(body: unknown): NodeJS.ReadableStream {
1515
if (body == null)
@@ -47,24 +47,68 @@ export function appendVisibleDelta(acc: string, delta: { content?: unknown } | u
4747

4848
interface StreamChoiceChunk { index?: number, delta?: { content?: unknown } }
4949

50+
interface NonStreamChoice { index?: number, message?: { content?: unknown } }
51+
52+
async function readableToUtf8String(stream: NodeJS.ReadableStream): Promise<string> {
53+
const chunks: Buffer[] = []
54+
for await (const chunk of stream as AsyncIterable<string | Buffer>) {
55+
if (Buffer.isBuffer(chunk))
56+
chunks.push(chunk)
57+
else if (typeof chunk === 'string')
58+
chunks.push(Buffer.from(chunk))
59+
else
60+
chunks.push(Buffer.from(String(chunk)))
61+
}
62+
return Buffer.concat(chunks as readonly Uint8Array[]).toString('utf8')
63+
}
64+
5065
/**
51-
* Read an SSE stream and return one finished string per completion choice.
52-
* Buckets by `choices[].index` up to `choiceCount` (requested `n`).
53-
* Returned length matches how many indices actually appeared in the stream (capped by `choiceCount`),
54-
* mirroring non-stream `json.choices.length` when the provider returns fewer parallel completions.
66+
* Parse a non-streaming `chat/completions` JSON body when `stream: true` was ignored.
67+
* @returns subjects slice, or `undefined` if the body is not a usable completion object.
5568
*/
56-
export async function readChatCompletionStreamToSubjects(
57-
input: NodeJS.ReadableStream,
69+
function trySubjectsFromNonStreamCompletionJson(
70+
body: string,
5871
choiceCount: number,
59-
): Promise<string[]> {
60-
if (choiceCount < 1)
61-
throw new Error('choiceCount must be at least 1')
72+
): string[] | undefined {
73+
const t = body.trim()
74+
if (!t.startsWith('{'))
75+
return undefined
76+
let json: unknown
77+
try {
78+
json = JSON.parse(t)
79+
}
80+
catch {
81+
return undefined
82+
}
83+
if (!json || typeof json !== 'object')
84+
return undefined
85+
const o = json as { choices?: NonStreamChoice[], error?: { message?: string } }
86+
if (o.error)
87+
throw new Error(o.error.message || 'OpenAI API error')
88+
if (!Array.isArray(o.choices))
89+
return undefined
6290

6391
const buffers = Array.from({ length: choiceCount }, () => '')
6492
let maxIndexSeen = -1
65-
const rl = readline.createInterface({ input, crlfDelay: Infinity })
93+
for (const ch of o.choices) {
94+
const idx = typeof ch.index === 'number' ? ch.index : 0
95+
if (idx >= 0 && idx < choiceCount) {
96+
buffers[idx] = appendVisibleDelta('', { content: ch.message?.content })
97+
maxIndexSeen = Math.max(maxIndexSeen, idx)
98+
}
99+
}
100+
if (maxIndexSeen < 0)
101+
return undefined
102+
return buffers.slice(0, maxIndexSeen + 1)
103+
}
66104

67-
for await (const line of rl) {
105+
function collectSubjectsFromSseLines(
106+
body: string,
107+
choiceCount: number,
108+
): { buffers: string[], maxIndexSeen: number } {
109+
const buffers = Array.from({ length: choiceCount }, () => '')
110+
let maxIndexSeen = -1
111+
for (const line of body.split(/\r?\n/)) {
68112
const trimmed = line.trim()
69113
if (!trimmed.startsWith('data:'))
70114
continue
@@ -93,7 +137,35 @@ export async function readChatCompletionStreamToSubjects(
93137
throw e
94138
}
95139
}
140+
return { buffers, maxIndexSeen }
141+
}
142+
143+
/**
144+
* Read an OpenAI-style `chat/completions` response body and return one finished string per choice.
145+
* Primary path: SSE lines (`data: {...}`) with `choices[].delta`, bucketed by `choices[].index`
146+
* up to `choiceCount` (requested `n`). Returned length is `maxSeenIndex + 1` (capped by `choiceCount`),
147+
* mirroring non-stream `choices.length` when fewer parallel completions appear.
148+
* Fallback: if no choice index ever appears (e.g. provider ignores `stream: true` and returns one JSON object),
149+
* the full body is parsed as a non-streaming completion using `choices[].message.content`.
150+
*/
151+
export async function readChatCompletionStreamToSubjects(
152+
input: NodeJS.ReadableStream,
153+
choiceCount: number,
154+
): Promise<string[]> {
155+
if (choiceCount < 1)
156+
throw new Error('choiceCount must be at least 1')
157+
158+
const body = await readableToUtf8String(input)
159+
const { buffers, maxIndexSeen } = collectSubjectsFromSseLines(body, choiceCount)
160+
161+
if (maxIndexSeen >= 0)
162+
return buffers.slice(0, maxIndexSeen + 1)
163+
164+
const fromJson = trySubjectsFromNonStreamCompletionJson(body, choiceCount)
165+
if (fromJson !== undefined)
166+
return fromJson
96167

97-
const effectiveLen = maxIndexSeen < 0 ? 1 : maxIndexSeen + 1
98-
return buffers.slice(0, effectiveLen)
168+
throw new Error(
169+
'Chat completions response had no streamed choice deltas and is not a parseable non-streaming JSON body with choices (or choices were empty).',
170+
)
99171
}

0 commit comments

Comments
 (0)