Skip to content

Commit b63f89d

Browse files
committed
first stream test passing
1 parent 1770b0b commit b63f89d

File tree

5 files changed

+234
-13
lines changed

5 files changed

+234
-13
lines changed

lib/instrumentation/openai.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,9 @@ function instrumentStream({ agent, shim, request, response, segment, transaction
285285
err = streamErr
286286
throw err
287287
} finally {
288-
if (chunk.choices) {
288+
if (chunk?.choices) {
289289
chunk.choices[0].message = { role, content }
290-
} else if (chunk.response) {
290+
} else if (chunk?.response) {
291291
chunk = chunk.response
292292
}
293293

test/versioned/openai/chat-completions-v5.test.js

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,4 +296,35 @@ test('responses.create', async (t) => {
296296
end()
297297
})
298298
})
299+
300+
await t.test('should create span on successful responses stream create', (t, end) => {
301+
const { client, agent, host, port } = t.nr
302+
helper.runInTransaction(agent, async (tx) => {
303+
const content = 'Streamed response'
304+
const stream = await client.responses.create({
305+
stream: true,
306+
input: content,
307+
model: 'gpt-4'
308+
})
309+
310+
let chunk = {}
311+
for await (chunk of stream) {
312+
continue
313+
}
314+
assert.equal(chunk.headers, undefined, 'should remove response headers from user result')
315+
assert.equal(chunk.response.output[0].role, 'assistant')
316+
const expectedRes = responses.get(content)
317+
assert.equal(chunk.response.output[0].content[0].text, expectedRes.body.response.output[0].content[0].text)
318+
319+
assertSegments(
320+
tx.trace,
321+
tx.trace.root,
322+
[OPENAI.COMPLETION, `External/${host}:${port}/responses`],
323+
{ exact: false }
324+
)
325+
326+
tx.end()
327+
end()
328+
})
329+
})
299330
})

test/versioned/openai/mock-responses-api-responses.js

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -118,24 +118,22 @@ responses.set('Streamed response', {
118118
code: 200,
119119
body: {
120120
type: 'response.completed',
121-
sequence_number: 100,
121+
sequence_number: 9,
122122
response: {
123-
id: 'resp_68420d9a5d4481a1bff5b86663299e3403b76731ee674f61',
123+
id: 'resp_684886977be881928c9db234e14ae7d80f8976796514dff9',
124124
object: 'response',
125125
created_at: 1749221320,
126126
model: 'gpt-4-0613',
127127
output: [{
128128
content: [{
129-
text: "A streamed response is a way of transmitting data from a server to a client (e.g. from a website to a user's computer or mobile device) in a continuous flow or stream, rather than all at one time. This means the client can start to process the data before all of it has been received, which can improve performance for large amounts of data or slow connections. Streaming is often used for real-time or near-real-time applications like video or audio playback.",
129+
text: 'Test stream',
130130
}],
131131
role: 'assistant',
132132
status: 'completed',
133-
id: 'msg_6843007469bc8192af5e145250c297db0374f342293105d9',
133+
id: 'msg_68488698f6088192a505b70393c560bc0f8976796514dff9',
134134
}]
135135
}
136-
},
137-
// For testing purposes only
138-
streamData: "A streamed response is a way of transmitting data from a server to a client (e.g. from a website to a user's computer or mobile device) in a continuous flow or stream, rather than all at one time. This means the client can start to process the data before all of it has been received, which can improve performance for large amounts of data or slow connections. Streaming is often used for real-time or near-real-time applications like video or audio playback.",
136+
}
139137
})
140138

141139
responses.set('bad stream', {

test/versioned/openai/mock-server-v5.js

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ module.exports = openaiMockServer
99

1010
const http = require('node:http')
1111
const RESPONSES = require('./mock-responses-api-responses')
12+
const STREAM_CHUNKS = require('./stream-chunks-v5')
13+
const { Readable } = require('node:stream')
1214

1315
/**
1416
* Build a mock server that listens on a 127.0.0.1 and a random port that
@@ -64,23 +66,76 @@ function handler(req, res) {
6466
return
6567
}
6668

67-
const { headers, code, body } = RESPONSES.get(prompt)
69+
const { headers, code, body, streamData } = RESPONSES.get(prompt)
6870
for (const [key, value] of Object.entries(headers)) {
6971
res.setHeader(key, value)
7072
}
7173
res.statusCode = code
7274

7375
if (payload.stream === true) {
74-
res.statusCode = 500
75-
res.write('Streaming is not yet supported in this mock server.')
76-
res.end()
76+
let outStream
77+
if (streamData !== 'do random') {
78+
outStream = finiteStream({ ...body })
79+
} else {
80+
outStream = randomStream({ ...body })
81+
let streamChunkCount = 0
82+
outStream.on('data', () => {
83+
if (streamChunkCount >= 100) {
84+
outStream.destroy()
85+
res.destroy()
86+
}
87+
streamChunkCount += 1
88+
})
89+
}
90+
91+
outStream.pipe(res)
7792
} else {
7893
res.write(JSON.stringify(body))
7994
res.end()
8095
}
8196
})
8297
}
8398

99+
/**
100+
* Uses `STREAM_CHUNKS` into chunks to returns a stream that
101+
* sends those chunks as OpenAI v5 data stream messages. This stream
102+
* has a finite number of messages that will be sent.
103+
*
104+
* @param body
105+
* @returns {Readable} A paused stream.
106+
*/
107+
function finiteStream(body) {
108+
return new Readable({
109+
read() {
110+
// This is how the data is streamed from openai
111+
for (let i = 0; i < STREAM_CHUNKS.length; i++) {
112+
const chunkString = JSON.stringify(STREAM_CHUNKS[i])
113+
this.push(`data: ${chunkString}\n\n`)
114+
}
115+
this.push('data: [DONE]\n\n')
116+
this.push(null)
117+
}
118+
}).pause()
119+
}
120+
121+
/**
122+
* Creates a stream that will stream an infinite number of OpenAI stream data
123+
* chunks.
124+
*
125+
* @param {object} chunkTemplate An object that is shaped like an OpenAI stream
126+
* data object.
127+
* @returns {Readable} A paused stream.
128+
*/
129+
function randomStream(chunkTemplate) {
130+
return new Readable({
131+
read(size = 16) {
132+
const data = crypto.randomBytes(size)
133+
chunkTemplate.choices[0].delta.content = data.toString('base64')
134+
this.push('data: ' + JSON.stringify(chunkTemplate) + '\n\n')
135+
}
136+
}).pause()
137+
}
138+
84139
function getShortenedPrompt(reqBody) {
85140
const prompt = reqBody.input?.[0]?.content || reqBody.input?.badContent || reqBody.input
86141

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2025 New Relic Corporation. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
'use strict'
7+
8+
const chunks = []
9+
module.exports = chunks
10+
11+
// Setup chunks
12+
chunks.push({
13+
response: {
14+
id: 'resp_684886977be881928c9db234e14ae7d80f8976796514dff9',
15+
model: 'gpt-4-0613',
16+
object: 'response',
17+
status: 'in_progress',
18+
output: []
19+
},
20+
type: 'response.created',
21+
sequence_number: 0
22+
})
23+
24+
chunks.push({
25+
response: {
26+
id: 'resp_684886977be881928c9db234e14ae7d80f8976796514dff9',
27+
model: 'gpt-4-0613',
28+
object: 'response',
29+
status: 'in_progress',
30+
output: []
31+
},
32+
type: 'response.in_progress',
33+
sequence_number: 1
34+
})
35+
36+
chunks.push({
37+
item: {
38+
id: 'msg_68488698f6088192a505b70393c560bc0f8976796514dff9',
39+
role: 'assistant',
40+
status: 'in_progress',
41+
type: 'message'
42+
},
43+
output_index: 0,
44+
type: 'response.output_item.added',
45+
sequence_number: 2
46+
})
47+
48+
chunks.push({
49+
part: {
50+
type: 'output_text',
51+
text: ''
52+
},
53+
content_index: 0,
54+
output_index: 0,
55+
item_id: 'msg_68488698f6088192a505b70393c560bc0f8976796514dff9',
56+
type: 'response.content_part.added',
57+
sequence_number: 3
58+
})
59+
60+
// Delta chunks for the actual text
61+
chunks.push({
62+
content_index: 0,
63+
delta: 'Test',
64+
item_id: 'msg_68488698f6088192a505b70393c560bc0f8976796514dff9',
65+
output_index: 0,
66+
sequence_number: 4,
67+
type: 'response.output_text.delta',
68+
})
69+
70+
chunks.push({
71+
content_index: 0,
72+
delta: 'stream',
73+
item_id: 'msg_68488698f6088192a505b70393c560bc0f8976796514dff9',
74+
output_index: 0,
75+
sequence_number: 5,
76+
type: 'response.output_text.delta',
77+
})
78+
79+
// Finishing up - summing deltas together
80+
chunks.push({
81+
content_index: 0,
82+
item_id: 'msg_68488698f6088192a505b70393c560bc0f8976796514dff9',
83+
output_index: 0,
84+
sequence_number: 6,
85+
text: 'Test stream',
86+
type: 'response.output_text.done',
87+
})
88+
89+
chunks.push({
90+
content_index: 0,
91+
item_id: 'msg_68488698f6088192a505b70393c560bc0f8976796514dff9',
92+
output_index: 0,
93+
part: {
94+
type: 'output_text',
95+
text: 'Test stream'
96+
},
97+
sequence_number: 7,
98+
type: 'response.content_part.done',
99+
})
100+
101+
chunks.push({
102+
item: {
103+
content: [{
104+
text: 'Test stream',
105+
type: 'output_text'
106+
}],
107+
id: 'msg_68488698f6088192a505b70393c560bc0f8976796514dff9',
108+
role: 'assistant',
109+
status: 'completed',
110+
type: 'message'
111+
},
112+
output_index: 0,
113+
sequence_number: 8,
114+
type: 'response.output_item.done',
115+
})
116+
117+
chunks.push({
118+
response: {
119+
id: 'resp_684886977be881928c9db234e14ae7d80f8976796514dff9',
120+
object: 'response',
121+
output: [{
122+
content: [{ text: 'Test stream' }],
123+
id: 'msg_68488698f6088192a505b70393c560bc0f8976796514dff9',
124+
role: 'assistant',
125+
status: 'completed',
126+
type: 'message'
127+
}],
128+
status: 'completed',
129+
usage: {
130+
input_tokens: 13,
131+
output_tokens: 4,
132+
total_tokens: 17
133+
}
134+
},
135+
sequence_number: 9,
136+
type: 'response_completed',
137+
})

0 commit comments

Comments
 (0)