Skip to content

Commit 6dcf308

Browse files
committed
feat(nbhttp): Implement http.Flusher for streaming support
The existing `nbhttp.Response` did not implement the `http.Flusher` interface, preventing its use with `http.NewResponseController` for streaming data to the client. This was particularly problematic for long-running requests or server-sent events (SSE). This change introduces full support for `http.Flusher` and ensures true streaming is possible by automatically enabling chunked encoding when a `Content-Length` is not explicitly set. Key changes: - Added a `Flush()` method to `Response` to implement `http.Flusher`. - Modified `checkChunked()` to default to `Transfer-Encoding: chunked` for responses without a `Content-Length`, which is crucial for streaming. - Added a no-op `Push()` method to satisfy the `http.Pusher` interface, returning an error as HTTP/2 Push is not supported.
1 parent c26ae5e commit 6dcf308

File tree

1 file changed

+68
-12
lines changed

1 file changed

+68
-12
lines changed

nbhttp/response.go

Lines changed: 68 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -345,31 +345,87 @@ func (res *Response) ReadFrom(r io.Reader) (n int64, err error) {
345345
return io.Copy(c, r)
346346
}
347347

348+
// Push implements the http.Pusher interface.
349+
// It is not supported by nbhttp, so it always returns an error.
350+
//
351+
//go:norace
352+
func (res *Response) Push(target string, opts *http.PushOptions) error {
353+
return errors.New("http: server push not supported")
354+
}
355+
356+
// Flush implements http.Flusher. It sends any buffered data to the client.
357+
//
358+
//go:norace
359+
func (res *Response) Flush() {
360+
if res.hijacked {
361+
return
362+
}
363+
if res.Parser == nil || res.Parser.Conn == nil {
364+
return
365+
}
366+
367+
res.WriteHeader(http.StatusOK)
368+
res.checkChunked()
369+
res.eoncodeHead()
370+
371+
conn := res.Parser.Conn
372+
373+
if res.buffer != nil && len(*res.buffer) > 0 {
374+
_, err := conn.Write(*res.buffer)
375+
if err != nil {
376+
logging.Error("Response.Flush: buffer write failed: %v", err)
377+
mempool.Free(res.buffer)
378+
res.buffer = nil
379+
} else {
380+
*res.buffer = (*res.buffer)[:0]
381+
}
382+
}
383+
384+
if res.bodyBuffer != nil && len(*res.bodyBuffer) > 0 {
385+
_, err := conn.Write(*res.bodyBuffer)
386+
if err != nil {
387+
logging.Error("Response.Flush: bodyBuffer write failed: %v", err)
388+
mempool.Free(res.bodyBuffer)
389+
res.bodyBuffer = nil
390+
} else {
391+
*res.bodyBuffer = (*res.bodyBuffer)[:0]
392+
}
393+
}
394+
}
395+
348396
// checkChunked .
349397
//
350398
//go:norace
351399
func (res *Response) checkChunked() {
352400
if res.chunkChecked {
353401
return
354402
}
355-
356403
res.chunkChecked = true
357404

358-
if res.request.ProtoAtLeast(1, 1) {
359-
for _, v := range res.header[transferEncodingHeader] {
360-
if v == "chunked" {
361-
res.chunked = true
362-
}
405+
// 1. See if chunking is already set
406+
for _, v := range res.header[transferEncodingHeader] {
407+
if v == "chunked" {
408+
res.chunked = true
409+
delete(res.header, contentLengthHeader)
410+
return
363411
}
364-
if !res.chunked {
365-
if len(res.header[trailerHeader]) > 0 {
366-
res.chunked = true
367-
hs := res.header[transferEncodingHeader]
368-
res.header[transferEncodingHeader] = append(hs, "chunked")
369-
}
412+
}
413+
414+
// 2. See if we should fall back to chunking
415+
if res.request.ProtoAtLeast(1, 1) && res.header.Get(contentLengthHeader) == "" {
416+
// Don't chunk for responses that are forbidden from having a body
417+
if res.statusCode != http.StatusNoContent && res.statusCode != http.StatusNotModified {
418+
res.chunked = true
370419
}
371420
}
421+
422+
// 3. See if we need to chunk for trailers
423+
if !res.chunked && len(res.header[trailerHeader]) > 0 {
424+
res.chunked = true
425+
}
426+
372427
if res.chunked {
428+
res.header.Set(transferEncodingHeader, "chunked")
373429
delete(res.header, contentLengthHeader)
374430
}
375431
}

0 commit comments

Comments
 (0)