Skip to content

Commit 246c4ae

Browse files
fix(providers): detect truncated Anthropic and OpenAI Responses streams (#33)
The Anthropic Messages streaming protocol guarantees message_stop as the final SSE event of every successful stream. Today the adapter treats any clean EOF (Stream.Err() == nil or io.EOF) as a successful Finish, even when the upstream body was cut off mid-response. This silently truncates the assistant's reply and commits the partial text as if it were the model's complete answer. Track whether message_stop was observed during the SSE loop. On clean EOF without it, yield StreamPartTypeError wrapping io.EOF so the failure surfaces as a retryable transport error rather than a phantom success. Existing transport errors continue to flow through the unchanged else branch; the event: error path keeps yielding via Stream.Err(). Tests cover happy path, EOF before message_stop, empty stream, and malformed stream (existing error path preserved). Also picks up a one-line gofmt fix in TestComputerUseToolJSON; the test file was not gofmt-clean at HEAD without it. * fix(providers/openai): require terminal Responses event before Finish The OpenAI Responses API emits terminal lifecycle events when a streamed response reaches its final state. The adapter currently yields Finish on any clean EOF, even if the stream ended before response.completed or response.incomplete. That has the same silent-truncation shape as the Anthropic message_stop bug in this PR. Track response.completed and response.incomplete before yielding Finish from both Stream and StreamObject. If the transport closes cleanly first, yield a StreamPartTypeError/ObjectStreamPartTypeError wrapping io.EOF so callers can retry instead of committing partial output. Also surface response.failed as an error event instead of falling through to Finish. Tests cover completed and incomplete terminal events, EOF before terminal event, empty streams, response.failed, malformed streams, and JSON-mode StreamObject truncation. Also fixes a pre-existing OpenAI test compile issue where one toResponsesPrompt call still expected two return values.
1 parent d0e6ce2 commit 246c4ae

4 files changed

Lines changed: 395 additions & 4 deletions

File tree

providers/anthropic/anthropic.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1244,6 +1244,7 @@ func (a languageModel) Stream(ctx context.Context, call fantasy.Call) (fantasy.S
12441244

12451245
stream := a.client.Messages.NewStreaming(ctx, *params, reqOpts...)
12461246
acc := anthropic.Message{}
1247+
var sawMessageStop bool
12471248
return func(yield func(fantasy.StreamPart) bool) {
12481249
if len(warnings) > 0 {
12491250
if !yield(fantasy.StreamPart{
@@ -1448,11 +1449,22 @@ func (a languageModel) Stream(ctx context.Context, call fantasy.Call) (fantasy.S
14481449
}
14491450
}
14501451
case "message_stop":
1452+
sawMessageStop = true
14511453
}
14521454
}
14531455

14541456
err := stream.Err()
14551457
if err == nil || errors.Is(err, io.EOF) {
1458+
if !sawMessageStop {
1459+
if err == nil {
1460+
err = io.EOF
1461+
}
1462+
yield(fantasy.StreamPart{
1463+
Type: fantasy.StreamPartTypeError,
1464+
Error: fmt.Errorf("anthropic stream closed before message_stop: %w", err),
1465+
})
1466+
return
1467+
}
14561468
yield(fantasy.StreamPart{
14571469
Type: fantasy.StreamPartTypeFinish,
14581470
ID: acc.ID,

providers/anthropic/anthropic_test.go

Lines changed: 117 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"io"
89
"math"
910
"net/http"
1011
"net/http/httptest"
@@ -504,6 +505,97 @@ func TestStream_SendsOutputConfigEffort(t *testing.T) {
504505
requireAnthropicEffort(t, call.body, EffortHigh)
505506
}
506507

508+
func TestStream_RequiresMessageStopBeforeFinish(t *testing.T) {
509+
t.Parallel()
510+
511+
completeTextStream := []string{
512+
anthropicSSEEvent("message_start", `{"type":"message_start","message":{"id":"msg_complete","type":"message","role":"assistant","model":"claude-sonnet-4-20250514","content":[],"stop_reason":null,"usage":{"input_tokens":1,"output_tokens":0}}}`),
513+
anthropicSSEEvent("content_block_start", `{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}`),
514+
anthropicSSEEvent("content_block_delta", `{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"hello"}}`),
515+
anthropicSSEEvent("content_block_stop", `{"type":"content_block_stop","index":0}`),
516+
anthropicSSEEvent("message_delta", `{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"input_tokens":1,"output_tokens":1}}`),
517+
anthropicSSEEvent("message_stop", `{"type":"message_stop"}`),
518+
}
519+
truncatedTextStream := completeTextStream[:len(completeTextStream)-1]
520+
521+
tests := []struct {
522+
name string
523+
chunks []string
524+
wantFinish bool
525+
wantEOF bool
526+
wantError string
527+
}{
528+
{
529+
name: "complete stream finishes",
530+
chunks: completeTextStream,
531+
wantFinish: true,
532+
},
533+
{
534+
name: "eof before message_stop returns EOF error",
535+
chunks: truncatedTextStream,
536+
wantEOF: true,
537+
},
538+
{
539+
name: "empty stream returns EOF error",
540+
wantEOF: true,
541+
},
542+
{
543+
name: "error event keeps existing error path",
544+
chunks: []string{
545+
anthropicSSEEvent("error", `{"type":"error","error":{"type":"overloaded_error","message":"stream down"}}`),
546+
},
547+
wantError: "stream down",
548+
},
549+
}
550+
551+
for _, tt := range tests {
552+
t.Run(tt.name, func(t *testing.T) {
553+
t.Parallel()
554+
555+
server, calls := newAnthropicStreamingServer(tt.chunks)
556+
defer server.Close()
557+
558+
provider, err := New(
559+
WithAPIKey("test-api-key"),
560+
WithBaseURL(server.URL),
561+
)
562+
require.NoError(t, err)
563+
564+
model, err := provider.LanguageModel(context.Background(), "claude-sonnet-4-20250514")
565+
require.NoError(t, err)
566+
567+
stream, err := model.Stream(context.Background(), fantasy.Call{
568+
Prompt: testPrompt(),
569+
})
570+
require.NoError(t, err)
571+
572+
parts := collectAnthropicStreamParts(stream)
573+
_ = awaitAnthropicCall(t, calls)
574+
575+
finishParts := streamPartsByType(parts, fantasy.StreamPartTypeFinish)
576+
errorParts := streamPartsByType(parts, fantasy.StreamPartTypeError)
577+
578+
if tt.wantFinish {
579+
require.Len(t, finishParts, 1)
580+
require.Empty(t, errorParts)
581+
require.Equal(t, fantasy.FinishReasonStop, finishParts[0].FinishReason)
582+
return
583+
}
584+
585+
require.Empty(t, finishParts)
586+
require.Len(t, errorParts, 1)
587+
require.Error(t, errorParts[0].Error)
588+
if tt.wantEOF {
589+
require.ErrorIs(t, errorParts[0].Error, io.EOF)
590+
require.Contains(t, errorParts[0].Error.Error(), "message_stop")
591+
} else {
592+
require.NotContains(t, errorParts[0].Error.Error(), "message_stop")
593+
require.Contains(t, errorParts[0].Error.Error(), tt.wantError)
594+
}
595+
})
596+
}
597+
}
598+
507599
type anthropicCall struct {
508600
method string
509601
path string
@@ -563,6 +655,29 @@ func newAnthropicStreamingServer(chunks []string) (*httptest.Server, <-chan anth
563655
return server, calls
564656
}
565657

658+
func anthropicSSEEvent(event, data string) string {
659+
return fmt.Sprintf("event: %s\ndata: %s\n\n", event, data)
660+
}
661+
662+
func collectAnthropicStreamParts(stream fantasy.StreamResponse) []fantasy.StreamPart {
663+
var parts []fantasy.StreamPart
664+
stream(func(part fantasy.StreamPart) bool {
665+
parts = append(parts, part)
666+
return true
667+
})
668+
return parts
669+
}
670+
671+
func streamPartsByType(parts []fantasy.StreamPart, typ fantasy.StreamPartType) []fantasy.StreamPart {
672+
var matches []fantasy.StreamPart
673+
for _, part := range parts {
674+
if part.Type == typ {
675+
matches = append(matches, part)
676+
}
677+
}
678+
return matches
679+
}
680+
566681
func awaitAnthropicCall(t *testing.T, calls <-chan anthropicCall) anthropicCall {
567682
t.Helper()
568683

@@ -1574,7 +1689,8 @@ func TestComputerUseToolJSON(t *testing.T) {
15741689
}
15751690
_, err := computerUseToolJSON(pdt)
15761691
require.Error(t, err)
1577-
require.Contains(t, err.Error(), "tool_version arg is missing") })
1692+
require.Contains(t, err.Error(), "tool_version arg is missing")
1693+
})
15781694

15791695
t.Run("returns error for unsupported version", func(t *testing.T) {
15801696
t.Parallel()

0 commit comments

Comments
 (0)