Skip to content

transport: Re-use slice buffer reader for a stream #8360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 35 additions & 20 deletions internal/transport/controlbuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,11 @@ type earlyAbortStream struct {
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }

type dataFrame struct {
streamID uint32
endStream bool
h []byte
reader mem.Reader
streamID uint32
endStream bool
h []byte
data mem.BufferSlice
processing bool
// onEachWrite is called every time
// a part of data is written out.
onEachWrite func()
Expand Down Expand Up @@ -234,6 +235,7 @@ type outStream struct {
itl *itemList
bytesOutStanding int
wq *writeQuota
reader mem.Reader

next *outStream
prev *outStream
Expand Down Expand Up @@ -461,7 +463,9 @@ func (c *controlBuffer) finish() {
v.onOrphaned(ErrConnClosing)
}
case *dataFrame:
_ = v.reader.Close()
if !v.processing {
v.data.Free()
}
}
}

Expand Down Expand Up @@ -650,10 +654,11 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {

func (l *loopyWriter) registerStreamHandler(h *registerStream) {
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
reader: mem.BufferSlice{}.Reader(),
}
l.estdStreams[h.streamID] = str
}
Expand Down Expand Up @@ -685,10 +690,11 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
}
// Case 2: Client wants to originate stream.
str := &outStream{
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
id: h.streamID,
state: empty,
itl: &itemList{},
wq: h.wq,
reader: mem.BufferSlice{}.Reader(),
}
return l.originateStream(str, h)
}
Expand Down Expand Up @@ -790,10 +796,13 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
// a RST_STREAM before stream initialization thus the stream might
// not be established yet.
delete(l.estdStreams, c.streamID)
str.reader.Close()
str.deleteSelf()
for head := str.itl.dequeueAll(); head != nil; head = head.next {
if df, ok := head.it.(*dataFrame); ok {
_ = df.reader.Close()
if !df.processing {
df.data.Free()
}
}
}
}
Expand Down Expand Up @@ -928,21 +937,27 @@ func (l *loopyWriter) processData() (bool, error) {
if str == nil {
return true, nil
}
reader := str.reader
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
if !dataItem.processing {
dataItem.processing = true
str.reader.Reset(dataItem.data)
dataItem.data.Free()
}
// A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames.
// Every dataFrame has two buffers; h that keeps grpc-message header and data
// that is the actual message. As an optimization to keep wire traffic low, data
// from data is copied to h to make as big as the maximum possible HTTP2 frame
// size.

if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
_ = dataItem.reader.Close()
_ = reader.Close()
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
Expand Down Expand Up @@ -971,8 +986,8 @@ func (l *loopyWriter) processData() (bool, error) {
}
// Compute how much of the header and data we can send within quota and max frame length
hSize := min(maxSize, len(dataItem.h))
dSize := min(maxSize-hSize, dataItem.reader.Remaining())
remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
dSize := min(maxSize-hSize, reader.Remaining())
remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
size := hSize + dSize

var buf *[]byte
Expand All @@ -993,7 +1008,7 @@ func (l *loopyWriter) processData() (bool, error) {
defer pool.Put(buf)

copy((*buf)[:hSize], dataItem.h)
_, _ = dataItem.reader.Read((*buf)[hSize:])
_, _ = reader.Read((*buf)[hSize:])
}

// Now that outgoing flow controls are checked we can replenish str's write quota
Expand All @@ -1014,7 +1029,7 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem.h = dataItem.h[hSize:]

if remainingBytes == 0 { // All the data from that message was written out.
_ = dataItem.reader.Close()
_ = reader.Close()
str.itl.dequeue()
}
if str.itl.isEmpty() {
Expand Down
15 changes: 6 additions & 9 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,32 +1091,29 @@ func (t *http2Client) GracefulClose() {
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
reader := data.Reader()

if opts.Last {
// If it's the last message, update stream state.
if !s.compareAndSwapState(streamActive, streamWriteDone) {
_ = reader.Close()
return errStreamDone
}
} else if s.getState() != streamActive {
_ = reader.Close()
return errStreamDone
}
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
h: hdr,
reader: reader,
data: data,
}
if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
_ = reader.Close()
dataLen := data.Len()
if hdr != nil || dataLen != 0 { // If it's not an empty data frame, check quota.
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
return err
}
}
data.Ref()
if err := t.controlBuf.put(df); err != nil {
_ = reader.Close()
data.Free()
return err
}
t.incrMsgSent()
Expand Down
13 changes: 5 additions & 8 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1132,33 +1132,30 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
reader := data.Reader()

if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.writeHeader(s, nil); err != nil {
_ = reader.Close()
return err
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
_ = reader.Close()
return t.streamContextErr(s)
}
}

df := &dataFrame{
streamID: s.id,
h: hdr,
reader: reader,
data: data,
onEachWrite: t.setResetPingStrikes,
}
if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
_ = reader.Close()
dataLen := data.Len()
if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
return t.streamContextErr(s)
}
data.Ref()
if err := t.controlBuf.put(df); err != nil {
_ = reader.Close()
data.Free()
return err
}
t.incrMsgSent()
Expand Down
6 changes: 4 additions & 2 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *ServerStream)
}
}
data := newBufferSlice(p)
data.Ref()
conn.controlBuf.put(&dataFrame{
streamID: s.id,
h: nil,
reader: data.Reader(),
data: data,
onEachWrite: func() {},
})
sent += len(p)
Expand Down Expand Up @@ -1092,11 +1093,12 @@ func (s) TestServerContextCanceledOnClosedConnection(t *testing.T) {
t.Fatalf("Failed to open stream: %v", err)
}
d := newBufferSlice(make([]byte, http2MaxFrameLen))
d.Ref()
ct.controlBuf.put(&dataFrame{
streamID: s.id,
endStream: false,
h: nil,
reader: d.Reader(),
data: d,
onEachWrite: func() {},
})
// Loop until the server side stream is created.
Expand Down
11 changes: 11 additions & 0 deletions mem/buffer_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ type Reader interface {
Close() error
// Remaining returns the number of unread bytes remaining in the slice.
Remaining() int
// Reset frees the currently held buffer slice and starts reading from the
// provided slice. This allows reusing the reader object.
Reset(s BufferSlice)
}

type sliceReader struct {
Expand All @@ -150,6 +153,14 @@ func (r *sliceReader) Remaining() int {
return r.len
}

func (r *sliceReader) Reset(s BufferSlice) {
r.data.Free()
s.Ref()
r.data = s
r.len = s.Len()
r.bufferIdx = 0
}

func (r *sliceReader) Close() error {
r.data.Free()
r.data = nil
Expand Down