Skip to content

Commit c54f7a2

Browse files
authored
Merge pull request #2335 from dgageot/board/fix-docker-agent-issue-2262-with-testing-130ce43e
fix: extract reasoning_content from DMR streaming responses
2 parents 5251add + 0dc2799 commit c54f7a2

File tree

2 files changed

+118
-3
lines changed

2 files changed

+118
-3
lines changed

pkg/model/provider/oaistream/adapter.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ This is a shared adapter for OpenAI-compatible streams.
55
*/
66

77
import (
8+
"encoding/json"
89
"io"
910

1011
"github.com/openai/openai-go/v3"
@@ -66,13 +67,24 @@ func (a *StreamAdapter) Recv() (chat.MessageStreamResponse, error) {
6667
a.lastFinishReason = finishReason
6768
}
6869

70+
// Extract reasoning_content from ExtraFields since the OpenAI SDK
71+
// does not yet have a dedicated field for it. Providers like DMR
72+
// send reasoning tokens as a "reasoning_content" JSON field in the
73+
// chat completion chunk delta.
74+
var reasoningContent string
75+
if ef, ok := choice.Delta.JSON.ExtraFields["reasoning_content"]; ok && ef.Raw() != "" {
76+
// ef.Raw() returns the raw JSON value (e.g. `"some text"`), so
77+
// we unmarshal it to get the plain Go string.
78+
_ = json.Unmarshal([]byte(ef.Raw()), &reasoningContent)
79+
}
80+
6981
response.Choices[i] = chat.MessageStreamChoice{
7082
Index: int(choice.Index),
7183
FinishReason: finishReason,
7284
Delta: chat.MessageDelta{
73-
Role: choice.Delta.Role,
74-
Content: choice.Delta.Content,
75-
// ReasoningContent not available in this SDK version
85+
Role: choice.Delta.Role,
86+
Content: choice.Delta.Content,
87+
ReasoningContent: reasoningContent,
7688
},
7789
}
7890

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package oaistream
2+
3+
import (
4+
"io"
5+
"net/http"
6+
"net/http/httptest"
7+
"testing"
8+
9+
"github.com/openai/openai-go/v3"
10+
"github.com/openai/openai-go/v3/packages/ssestream"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
// newTestStream creates an SSE stream from raw SSE event data served by a test HTTP server.
16+
func newTestStream(t *testing.T, sseData string) *ssestream.Stream[openai.ChatCompletionChunk] {
17+
t.Helper()
18+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
19+
w.Header().Set("Content-Type", "text/event-stream")
20+
_, _ = w.Write([]byte(sseData))
21+
}))
22+
t.Cleanup(srv.Close)
23+
24+
resp, err := http.Get(srv.URL) //nolint:gosec,bodyclose // body is closed by the stream
25+
require.NoError(t, err)
26+
return ssestream.NewStream[openai.ChatCompletionChunk](ssestream.NewDecoder(resp), nil)
27+
}
28+
29+
func TestStreamAdapter_ReasoningContent(t *testing.T) {
30+
t.Parallel()
31+
32+
// Simulate SSE events with reasoning_content field in the delta,
33+
// as sent by DMR for reasoning models.
34+
sseData := `data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"role":"assistant","reasoning_content":"Let me think"},"finish_reason":null}]}
35+
36+
data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"reasoning_content":" about this"},"finish_reason":null}]}
37+
38+
data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"content":"Hello!"},"finish_reason":null}]}
39+
40+
data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
41+
42+
data: [DONE]
43+
44+
`
45+
46+
stream := newTestStream(t, sseData)
47+
adapter := NewStreamAdapter(stream, false)
48+
defer adapter.Close()
49+
50+
// First chunk: reasoning content "Let me think"
51+
resp, err := adapter.Recv()
52+
require.NoError(t, err)
53+
require.Len(t, resp.Choices, 1)
54+
assert.Equal(t, "Let me think", resp.Choices[0].Delta.ReasoningContent)
55+
assert.Empty(t, resp.Choices[0].Delta.Content)
56+
57+
// Second chunk: reasoning content " about this"
58+
resp, err = adapter.Recv()
59+
require.NoError(t, err)
60+
require.Len(t, resp.Choices, 1)
61+
assert.Equal(t, " about this", resp.Choices[0].Delta.ReasoningContent)
62+
assert.Empty(t, resp.Choices[0].Delta.Content)
63+
64+
// Third chunk: regular content "Hello!"
65+
resp, err = adapter.Recv()
66+
require.NoError(t, err)
67+
require.Len(t, resp.Choices, 1)
68+
assert.Equal(t, "Hello!", resp.Choices[0].Delta.Content)
69+
assert.Empty(t, resp.Choices[0].Delta.ReasoningContent)
70+
71+
// Fourth chunk: finish reason stop
72+
resp, err = adapter.Recv()
73+
require.NoError(t, err)
74+
require.Len(t, resp.Choices, 1)
75+
assert.Equal(t, "stop", string(resp.Choices[0].FinishReason))
76+
77+
// Stream done
78+
_, err = adapter.Recv()
79+
assert.ErrorIs(t, err, io.EOF)
80+
}
81+
82+
func TestStreamAdapter_NoReasoningContent(t *testing.T) {
83+
t.Parallel()
84+
85+
// Simulate a normal stream without reasoning_content.
86+
sseData := `data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]}
87+
88+
data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
89+
90+
data: [DONE]
91+
92+
`
93+
94+
stream := newTestStream(t, sseData)
95+
adapter := NewStreamAdapter(stream, false)
96+
defer adapter.Close()
97+
98+
resp, err := adapter.Recv()
99+
require.NoError(t, err)
100+
require.Len(t, resp.Choices, 1)
101+
assert.Equal(t, "Hi", resp.Choices[0].Delta.Content)
102+
assert.Empty(t, resp.Choices[0].Delta.ReasoningContent)
103+
}

0 commit comments

Comments
 (0)