Skip to content

Commit d89fc58

Browse files
authored
Merge pull request #2359 from dgageot/board/fix-for-docker-agent-issue-2349-a172981b
Add regression tests for SSE comment lines from OpenRouter
2 parents d7c623e + 9915e4e commit d89fc58

File tree

1 file changed

+195
-0
lines changed

1 file changed

+195
-0
lines changed
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package openai
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
"net/http"
7+
"net/http/httptest"
8+
"strings"
9+
"testing"
10+
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/docker/docker-agent/pkg/chat"
15+
"github.com/docker/docker-agent/pkg/config/latest"
16+
"github.com/docker/docker-agent/pkg/environment"
17+
)
18+
19+
// writeSSEWithComments writes an SSE response prefixed with comment lines
20+
// (starting with ':'), as sent by providers like OpenRouter during initial
21+
// processing. Per the SSE spec, comment lines must be ignored by clients.
22+
// This is used to verify the fix for https://github.com/docker/docker-agent/issues/2349.
23+
func writeSSEWithComments(w http.ResponseWriter, sseLines []string) {
24+
w.Header().Set("Content-Type", "text/event-stream")
25+
flusher, _ := w.(http.Flusher)
26+
27+
// Comment lines like OpenRouter sends during processing
28+
_, _ = fmt.Fprint(w, ": OPENROUTER PROCESSING\n")
29+
_, _ = fmt.Fprint(w, ": OPENROUTER PROCESSING\n")
30+
31+
for _, line := range sseLines {
32+
_, _ = fmt.Fprint(w, line+"\n")
33+
}
34+
flusher.Flush()
35+
}
36+
37+
// TestCustomProvider_SSECommentLines_ChatCompletions is a regression test for
38+
// https://github.com/docker/docker-agent/issues/2349
39+
//
40+
// OpenRouter sends SSE comment lines (": OPENROUTER PROCESSING") before the
41+
// actual data events. This test verifies those comments don't cause
42+
// "unexpected end of JSON input" errors during streaming.
43+
func TestCustomProvider_SSECommentLines_ChatCompletions(t *testing.T) {
44+
t.Parallel()
45+
46+
chunks := []map[string]any{
47+
{
48+
"id": "gen-123", "object": "chat.completion.chunk", "model": "test",
49+
"choices": []map[string]any{{"index": 0, "delta": map[string]any{"role": "assistant", "content": ""}, "finish_reason": nil}},
50+
},
51+
{
52+
"id": "gen-123", "object": "chat.completion.chunk", "model": "test",
53+
"choices": []map[string]any{{"index": 0, "delta": map[string]any{"content": "hello"}, "finish_reason": nil}},
54+
},
55+
{
56+
"id": "gen-123", "object": "chat.completion.chunk", "model": "test",
57+
"choices": []map[string]any{{"index": 0, "delta": map[string]any{}, "finish_reason": "stop"}},
58+
},
59+
{
60+
"id": "gen-123", "object": "chat.completion.chunk", "model": "test",
61+
"choices": []map[string]any{}, "usage": map[string]any{"prompt_tokens": 10, "completion_tokens": 1, "total_tokens": 11},
62+
},
63+
}
64+
65+
var sseLines []string
66+
for _, chunk := range chunks {
67+
data, _ := json.Marshal(chunk)
68+
sseLines = append(sseLines, "data: "+string(data), "")
69+
}
70+
sseLines = append(sseLines, "data: [DONE]", "")
71+
72+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
73+
writeSSEWithComments(w, sseLines)
74+
}))
75+
defer server.Close()
76+
77+
cfg := &latest.ModelConfig{
78+
Provider: "openrouter",
79+
Model: "test-model",
80+
BaseURL: server.URL,
81+
TokenKey: "OPENROUTER_API_KEY",
82+
ProviderOpts: map[string]any{
83+
"api_type": "openai_chatcompletions",
84+
},
85+
}
86+
env := environment.NewMapEnvProvider(map[string]string{
87+
"OPENROUTER_API_KEY": "test-key",
88+
})
89+
90+
client, err := NewClient(t.Context(), cfg, env)
91+
require.NoError(t, err)
92+
93+
stream, err := client.CreateChatCompletionStream(
94+
t.Context(),
95+
[]chat.Message{{Role: chat.MessageRoleUser, Content: "hello"}},
96+
nil,
97+
)
98+
require.NoError(t, err)
99+
defer stream.Close()
100+
101+
var content strings.Builder
102+
for {
103+
chunk, err := stream.Recv()
104+
if err != nil {
105+
break
106+
}
107+
for _, choice := range chunk.Choices {
108+
content.WriteString(choice.Delta.Content)
109+
}
110+
}
111+
112+
assert.Equal(t, "hello", content.String())
113+
}
114+
115+
// TestCustomProvider_SSECommentLines_Responses is a regression test for
116+
// https://github.com/docker/docker-agent/issues/2349 using the Responses API
117+
// path (api_type: openai_responses), which is exactly what the issue reporter
118+
// was using with OpenRouter.
119+
func TestCustomProvider_SSECommentLines_Responses(t *testing.T) {
120+
t.Parallel()
121+
122+
events := []map[string]any{
123+
{"type": "response.output_text.delta", "delta": "hello", "item_id": "item-1"},
124+
{
125+
"type": "response.completed",
126+
"response": map[string]any{
127+
"id": "resp-123",
128+
"status": "completed",
129+
"output": []map[string]any{
130+
{"type": "message", "id": "item-1"},
131+
},
132+
"usage": map[string]any{
133+
"input_tokens": 10,
134+
"output_tokens": 1,
135+
"total_tokens": 11,
136+
"input_tokens_details": map[string]any{
137+
"cached_tokens": 0,
138+
},
139+
"output_tokens_details": map[string]any{
140+
"reasoning_tokens": 0,
141+
},
142+
},
143+
},
144+
},
145+
}
146+
147+
var sseLines []string
148+
for _, event := range events {
149+
data, _ := json.Marshal(event)
150+
eventType := event["type"].(string)
151+
sseLines = append(sseLines, "event: "+eventType, "data: "+string(data), "")
152+
}
153+
154+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
155+
writeSSEWithComments(w, sseLines)
156+
}))
157+
defer server.Close()
158+
159+
cfg := &latest.ModelConfig{
160+
Provider: "openrouter",
161+
Model: "test-model",
162+
BaseURL: server.URL,
163+
TokenKey: "OPENROUTER_API_KEY",
164+
ProviderOpts: map[string]any{
165+
"api_type": "openai_responses",
166+
},
167+
}
168+
env := environment.NewMapEnvProvider(map[string]string{
169+
"OPENROUTER_API_KEY": "test-key",
170+
})
171+
172+
client, err := NewClient(t.Context(), cfg, env)
173+
require.NoError(t, err)
174+
175+
stream, err := client.CreateChatCompletionStream(
176+
t.Context(),
177+
[]chat.Message{{Role: chat.MessageRoleUser, Content: "hello"}},
178+
nil,
179+
)
180+
require.NoError(t, err)
181+
defer stream.Close()
182+
183+
var content strings.Builder
184+
for {
185+
chunk, err := stream.Recv()
186+
if err != nil {
187+
break
188+
}
189+
for _, choice := range chunk.Choices {
190+
content.WriteString(choice.Delta.Content)
191+
}
192+
}
193+
194+
assert.Equal(t, "hello", content.String())
195+
}

0 commit comments

Comments
 (0)