-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsse.go
More file actions
125 lines (110 loc) · 2.71 KB
/
sse.go
File metadata and controls
125 lines (110 loc) · 2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package opencode
import (
"bufio"
"bytes"
"encoding/json"
"io"
"net/http"
)
// Stream is a generic SSE (Server-Sent Events) stream iterator.
// It reads from an HTTP response body and unmarshals each SSE event's
// data payload into the type parameter T.
//
// Usage:
//
// stream := NewSSEStream[GlobalEvent](resp, nil)
// defer stream.Close()
// for stream.Next() {
// event := stream.Current()
// // ...
// }
// if err := stream.Err(); err != nil {
// // handle error
// }
type Stream[T any] struct {
rc io.ReadCloser
scanner *bufio.Scanner
current T
err error
done bool
}
// NewSSEStream wraps an HTTP response in a streaming SSE iterator.
// If err is non-nil the stream will immediately report that error.
// If resp or resp.Body is nil the stream will be empty (Next returns false).
func NewSSEStream[T any](resp *http.Response, err error) *Stream[T] {
s := &Stream[T]{}
if err != nil {
s.err = err
s.done = true
return s
}
if resp == nil || resp.Body == nil {
s.done = true
return s
}
s.rc = resp.Body
s.scanner = bufio.NewScanner(resp.Body)
s.scanner.Buffer(make([]byte, 0, bufio.MaxScanTokenSize), bufio.MaxScanTokenSize<<9)
return s
}
// Next advances the stream to the next SSE event and unmarshals its data
// payload into T. Returns false when the stream is exhausted or an error
// occurs. Call Current() to retrieve the value, Err() for any error.
func (s *Stream[T]) Next() bool {
if s.done || s.err != nil {
return false
}
data := bytes.NewBuffer(nil)
for s.scanner.Scan() {
line := s.scanner.Bytes()
// Empty line = dispatch the accumulated event.
if len(line) == 0 {
if data.Len() == 0 {
// No data accumulated yet — keep scanning.
continue
}
var v T
if err := json.Unmarshal(data.Bytes(), &v); err != nil {
s.err = err
return false
}
s.current = v
return true
}
// Split "field: value".
name, value, _ := bytes.Cut(line, []byte(":"))
// Consume optional leading space after colon.
if len(value) > 0 && value[0] == ' ' {
value = value[1:]
}
switch string(name) {
case "data":
_, _ = data.Write(value)
_, _ = data.WriteRune('\n')
case "event", "id":
// Ignored for our purposes.
case "":
// Lines starting with ":" are SSE comments — skip.
}
}
if err := s.scanner.Err(); err != nil {
s.err = err
}
s.done = true
return false
}
// Current returns the most recently parsed event value.
func (s *Stream[T]) Current() T {
return s.current
}
// Err returns the first error encountered during streaming, if any.
func (s *Stream[T]) Err() error {
return s.err
}
// Close closes the underlying response body.
func (s *Stream[T]) Close() error {
if s.rc == nil {
return nil
}
return s.rc.Close()
}