Skip to content

Commit 3866b57

Browse files
authored
feat: support http stream response (#4055)
1 parent 5fbe8ff commit 3866b57

File tree

2 files changed

+78
-0
lines changed

2 files changed

+78
-0
lines changed

rest/httpx/responses.go

+21
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"io"
89
"net/http"
910
"sync"
1011

@@ -88,6 +89,26 @@ func SetOkHandler(handler func(context.Context, any) any) {
8889
okHandler = handler
8990
}
9091

92+
// Stream writes data into w with streaming mode.
93+
// The ctx is used to control the streaming loop, typically use r.Context().
94+
// The fn is called repeatedly until it returns false.
95+
func Stream(ctx context.Context, w http.ResponseWriter, fn func(w io.Writer) bool) {
96+
for {
97+
select {
98+
case <-ctx.Done():
99+
return
100+
default:
101+
hasMore := fn(w)
102+
if flusher, ok := w.(http.Flusher); ok {
103+
flusher.Flush()
104+
}
105+
if !hasMore {
106+
return
107+
}
108+
}
109+
}
110+
}
111+
91112
// WriteJson writes v as json string into w with code.
92113
func WriteJson(w http.ResponseWriter, code int, v any) {
93114
if err := doWriteJson(w, code, v); err != nil {

rest/httpx/responses_test.go

+57
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
package httpx
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
8+
"io"
79
"net/http"
10+
"net/http/httptest"
811
"strings"
912
"testing"
1013

@@ -239,6 +242,60 @@ func TestWriteJsonMarshalFailed(t *testing.T) {
239242
assert.Equal(t, http.StatusInternalServerError, w.code)
240243
}
241244

245+
func TestStream(t *testing.T) {
246+
t.Run("regular case", func(t *testing.T) {
247+
channel := make(chan string)
248+
go func() {
249+
defer close(channel)
250+
for index := 0; index < 5; index++ {
251+
channel <- fmt.Sprintf("%d", index)
252+
}
253+
}()
254+
255+
w := httptest.NewRecorder()
256+
Stream(context.Background(), w, func(w io.Writer) bool {
257+
output, ok := <-channel
258+
if !ok {
259+
return false
260+
}
261+
262+
outputBytes := bytes.NewBufferString(output)
263+
_, err := w.Write(append(outputBytes.Bytes(), []byte("\n")...))
264+
return err == nil
265+
})
266+
267+
assert.Equal(t, http.StatusOK, w.Code)
268+
assert.Equal(t, "0\n1\n2\n3\n4\n", w.Body.String())
269+
})
270+
271+
t.Run("context done", func(t *testing.T) {
272+
channel := make(chan string)
273+
go func() {
274+
defer close(channel)
275+
for index := 0; index < 5; index++ {
276+
channel <- fmt.Sprintf("num: %d", index)
277+
}
278+
}()
279+
280+
w := httptest.NewRecorder()
281+
ctx, cancel := context.WithCancel(context.Background())
282+
cancel()
283+
Stream(ctx, w, func(w io.Writer) bool {
284+
output, ok := <-channel
285+
if !ok {
286+
return false
287+
}
288+
289+
outputBytes := bytes.NewBufferString(output)
290+
_, err := w.Write(append(outputBytes.Bytes(), []byte("\n")...))
291+
return err == nil
292+
})
293+
294+
assert.Equal(t, http.StatusOK, w.Code)
295+
assert.Equal(t, "", w.Body.String())
296+
})
297+
}
298+
242299
type tracedResponseWriter struct {
243300
headers map[string][]string
244301
builder strings.Builder

0 commit comments

Comments
 (0)