Skip to content

Commit 5cef451

Browse files
authored
Merge pull request #4 from ethpandaops/refactor/provider-audit-cutover
Add SDK audit envelopes and fix ReceiveResponse hang
2 parents 380a150 + d95f7d7 commit 5cef451

23 files changed

+1538
-479
lines changed

_typos.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[default.extend-words]
2+
# prompt_tokens_details / completion_tokens_details abbreviations
3+
ptd = "ptd"
4+
ctd = "ctd"

client_impl_test.go

Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
package openroutersdk
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
"time"
8+
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
12+
"github.com/ethpandaops/openrouter-agent-sdk-go/internal/message"
13+
)
14+
15+
func TestReceiveResponse_WaitsForProducerShutdownAfterResult(
16+
t *testing.T,
17+
) {
18+
msgs := make(chan message.Message, 1)
19+
errs := make(chan error)
20+
21+
c := &clientImpl{
22+
connected: true,
23+
currentMsgs: msgs,
24+
currentErrs: errs,
25+
}
26+
27+
done := make(chan struct{})
28+
29+
go func() {
30+
defer close(done)
31+
for range c.ReceiveResponse(context.Background()) {
32+
}
33+
}()
34+
35+
msgs <- &message.ResultMessage{
36+
Type: "result",
37+
Subtype: "success",
38+
SessionID: "default",
39+
}
40+
41+
select {
42+
case <-done:
43+
t.Fatal("ReceiveResponse returned before producer shutdown; post-result cleanup can be skipped")
44+
case <-time.After(50 * time.Millisecond):
45+
}
46+
47+
close(msgs)
48+
close(errs)
49+
50+
select {
51+
case <-done:
52+
case <-time.After(time.Second):
53+
t.Fatal("ReceiveResponse did not finish after producer shutdown")
54+
}
55+
}
56+
57+
// TestReceiveResponse_DrainsMessagesAfterResult is a regression test
58+
// proving that messages arriving AFTER a ResultMessage are silently
59+
// drained rather than blocking the producer goroutine. Without the
60+
// drain-after-result fix, the producer would be blocked forever
61+
// trying to send on a full channel (goroutine leak).
62+
func TestReceiveResponse_DrainsMessagesAfterResult(t *testing.T) {
63+
// Use buffered channels to simulate a producer that sends
64+
// additional messages after the result.
65+
msgs := make(chan message.Message, 5)
66+
errs := make(chan error, 1)
67+
68+
c := &clientImpl{
69+
connected: true,
70+
currentMsgs: msgs,
71+
currentErrs: errs,
72+
}
73+
74+
// Producer sends: text message, result, then 3 more messages.
75+
msgs <- &message.AssistantMessage{
76+
Content: []message.ContentBlock{
77+
&message.TextBlock{Text: "thinking..."},
78+
},
79+
}
80+
msgs <- &message.ResultMessage{
81+
Type: "result",
82+
Subtype: "success",
83+
SessionID: "default",
84+
}
85+
msgs <- &message.AssistantMessage{
86+
Content: []message.ContentBlock{
87+
&message.TextBlock{Text: "post-result-1"},
88+
},
89+
}
90+
msgs <- &message.AssistantMessage{
91+
Content: []message.ContentBlock{
92+
&message.TextBlock{Text: "post-result-2"},
93+
},
94+
}
95+
msgs <- &message.AssistantMessage{
96+
Content: []message.ContentBlock{
97+
&message.TextBlock{Text: "post-result-3"},
98+
},
99+
}
100+
close(msgs)
101+
close(errs)
102+
103+
// Consumer reads all yielded messages.
104+
var received []message.Message
105+
106+
for msg, err := range c.ReceiveResponse(context.Background()) {
107+
if err != nil {
108+
t.Fatalf("unexpected error: %v", err)
109+
}
110+
111+
received = append(received, msg)
112+
}
113+
114+
// Consumer should only see messages up to and including the
115+
// ResultMessage. Post-result messages should be drained silently.
116+
require.Len(t, received, 2,
117+
"consumer should receive exactly 2 messages: "+
118+
"assistant text + result; post-result messages "+
119+
"should be drained silently")
120+
121+
_, isResult := received[1].(*message.ResultMessage)
122+
assert.True(t, isResult,
123+
"second yielded message should be the ResultMessage")
124+
}
125+
126+
// TestReceiveResponse_DrainsErrorsAfterResult proves that errors
127+
// arriving after the ResultMessage is consumed are drained without
128+
// being yielded to the consumer.
129+
func TestReceiveResponse_DrainsErrorsAfterResult(t *testing.T) {
130+
// Use unbuffered msgs so we control the ordering precisely.
131+
msgs := make(chan message.Message)
132+
errs := make(chan error, 2)
133+
134+
c := &clientImpl{
135+
connected: true,
136+
currentMsgs: msgs,
137+
currentErrs: errs,
138+
}
139+
140+
done := make(chan struct{})
141+
142+
var receivedMsgs []message.Message
143+
144+
var receivedErrs []error
145+
146+
go func() {
147+
defer close(done)
148+
149+
for msg, err := range c.ReceiveResponse(
150+
context.Background(),
151+
) {
152+
if err != nil {
153+
receivedErrs = append(receivedErrs, err)
154+
}
155+
if msg != nil {
156+
receivedMsgs = append(receivedMsgs, msg)
157+
}
158+
}
159+
}()
160+
161+
// Send result on the msgs channel — blocks until consumer reads it.
162+
msgs <- &message.ResultMessage{
163+
Type: "result",
164+
Subtype: "success",
165+
SessionID: "default",
166+
}
167+
168+
// Now that the result has been consumed (the send above unblocked),
169+
// inject errors. These should be drained, not yielded.
170+
errs <- errors.New("post-result transport error")
171+
errs <- errors.New("another post-result error")
172+
close(errs)
173+
close(msgs)
174+
175+
select {
176+
case <-done:
177+
case <-time.After(2 * time.Second):
178+
t.Fatal("ReceiveResponse did not finish")
179+
}
180+
181+
require.Len(t, receivedMsgs, 1,
182+
"should receive exactly the ResultMessage")
183+
assert.Empty(t, receivedErrs,
184+
"errors after result should be drained, not yielded")
185+
}
186+
187+
// TestReceiveResponse_ContextCancelStopsIteration proves that
188+
// cancelling the context causes ReceiveResponse to stop and
189+
// yield a context error, even while draining.
190+
func TestReceiveResponse_ContextCancelStopsIteration(
191+
t *testing.T,
192+
) {
193+
msgs := make(chan message.Message)
194+
errs := make(chan error)
195+
196+
c := &clientImpl{
197+
connected: true,
198+
currentMsgs: msgs,
199+
currentErrs: errs,
200+
}
201+
202+
ctx, cancel := context.WithCancel(context.Background())
203+
204+
done := make(chan struct{})
205+
206+
go func() {
207+
defer close(done)
208+
209+
for _, err := range c.ReceiveResponse(ctx) {
210+
if err != nil {
211+
return
212+
}
213+
}
214+
}()
215+
216+
// Cancel before sending anything.
217+
cancel()
218+
219+
select {
220+
case <-done:
221+
// Good — iterator exited due to context cancel.
222+
case <-time.After(2 * time.Second):
223+
t.Fatal(
224+
"ReceiveResponse should exit promptly on context cancel",
225+
)
226+
}
227+
}

0 commit comments

Comments
 (0)