From 60262885772f3c5e59ef415721394ed7cbcd6d5f Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Mon, 26 May 2025 19:36:25 +0530 Subject: [PATCH 1/2] Re-use slice buffer reader for a stream --- internal/transport/controlbuf.go | 55 ++++++++++++++++++---------- internal/transport/http2_client.go | 15 +++----- internal/transport/http2_server.go | 13 +++---- internal/transport/transport_test.go | 6 ++- mem/buffer_slice.go | 11 ++++++ 5 files changed, 61 insertions(+), 39 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index ef72fbb3a016..ca3d72faff4f 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -146,10 +146,11 @@ type earlyAbortStream struct { func (*earlyAbortStream) isTransportResponseFrame() bool { return false } type dataFrame struct { - streamID uint32 - endStream bool - h []byte - reader mem.Reader + streamID uint32 + endStream bool + h []byte + data mem.BufferSlice + processing bool // onEachWrite is called every time // a part of data is written out. onEachWrite func() @@ -234,6 +235,7 @@ type outStream struct { itl *itemList bytesOutStanding int wq *writeQuota + reader mem.Reader next *outStream prev *outStream @@ -461,7 +463,9 @@ func (c *controlBuffer) finish() { v.onOrphaned(ErrConnClosing) } case *dataFrame: - _ = v.reader.Close() + if !v.processing { + v.data.Free() + } } } @@ -650,10 +654,11 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error { func (l *loopyWriter) registerStreamHandler(h *registerStream) { str := &outStream{ - id: h.streamID, - state: empty, - itl: &itemList{}, - wq: h.wq, + id: h.streamID, + state: empty, + itl: &itemList{}, + wq: h.wq, + reader: mem.BufferSlice{}.Reader(), } l.estdStreams[h.streamID] = str } @@ -685,10 +690,11 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error { } // Case 2: Client wants to originate stream. str := &outStream{ - id: h.streamID, - state: empty, - itl: &itemList{}, - wq: h.wq, + id: h.streamID, + state: empty, + itl: &itemList{}, + wq: h.wq, + reader: mem.BufferSlice{}.Reader(), } return l.originateStream(str, h) } @@ -790,10 +796,13 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error { // a RST_STREAM before stream initialization thus the stream might // not be established yet. delete(l.estdStreams, c.streamID) + str.reader.Close() str.deleteSelf() for head := str.itl.dequeueAll(); head != nil; head = head.next { if df, ok := head.it.(*dataFrame); ok { - _ = df.reader.Close() + if !df.processing { + df.data.Free() + } } } } @@ -928,7 +937,13 @@ func (l *loopyWriter) processData() (bool, error) { if str == nil { return true, nil } + reader := str.reader dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream. + if !dataItem.processing { + dataItem.processing = true + str.reader.Reset(dataItem.data) + dataItem.data.Free() + } // A data item is represented by a dataFrame, since it later translates into // multiple HTTP2 data frames. // Every dataFrame has two buffers; h that keeps grpc-message header and data @@ -936,13 +951,13 @@ func (l *loopyWriter) processData() (bool, error) { // from data is copied to h to make as big as the maximum possible HTTP2 frame // size. - if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame + if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame // Client sends out empty data frame with endStream = true if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil { return false, err } str.itl.dequeue() // remove the empty data item from stream - _ = dataItem.reader.Close() + _ = reader.Close() if str.itl.isEmpty() { str.state = empty } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers. @@ -971,8 +986,8 @@ func (l *loopyWriter) processData() (bool, error) { } // Compute how much of the header and data we can send within quota and max frame length hSize := min(maxSize, len(dataItem.h)) - dSize := min(maxSize-hSize, dataItem.reader.Remaining()) - remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize + dSize := min(maxSize-hSize, reader.Remaining()) + remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize size := hSize + dSize var buf *[]byte @@ -993,7 +1008,7 @@ func (l *loopyWriter) processData() (bool, error) { defer pool.Put(buf) copy((*buf)[:hSize], dataItem.h) - _, _ = dataItem.reader.Read((*buf)[hSize:]) + _, _ = reader.Read((*buf)[hSize:]) } // Now that outgoing flow controls are checked we can replenish str's write quota @@ -1014,7 +1029,7 @@ func (l *loopyWriter) processData() (bool, error) { dataItem.h = dataItem.h[hSize:] if remainingBytes == 0 { // All the data from that message was written out. - _ = dataItem.reader.Close() + _ = reader.Close() str.itl.dequeue() } if str.itl.isEmpty() { diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index ef56592b944e..1b1847252885 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1091,32 +1091,29 @@ func (t *http2Client) GracefulClose() { // Write formats the data into HTTP2 data frame(s) and sends it out. The caller // should proceed only if Write returns nil. func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error { - reader := data.Reader() - if opts.Last { // If it's the last message, update stream state. if !s.compareAndSwapState(streamActive, streamWriteDone) { - _ = reader.Close() return errStreamDone } } else if s.getState() != streamActive { - _ = reader.Close() return errStreamDone } df := &dataFrame{ streamID: s.id, endStream: opts.Last, h: hdr, - reader: reader, + data: data, } - if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota. - if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil { - _ = reader.Close() + dataLen := data.Len() + if hdr != nil || dataLen != 0 { // If it's not an empty data frame, check quota. + if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil { return err } } + data.Ref() if err := t.controlBuf.put(df); err != nil { - _ = reader.Close() + data.Free() return err } t.incrMsgSent() diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index e4c3731bdb0a..e2c52179d95a 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -1132,17 +1132,13 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error { // Write converts the data into HTTP2 data frame and sends it out. Non-nil error // is returns if it fails (e.g., framing error, transport error). func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error { - reader := data.Reader() - if !s.isHeaderSent() { // Headers haven't been written yet. if err := t.writeHeader(s, nil); err != nil { - _ = reader.Close() return err } } else { // Writing headers checks for this condition. if s.getState() == streamDone { - _ = reader.Close() return t.streamContextErr(s) } } @@ -1150,15 +1146,16 @@ func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ df := &dataFrame{ streamID: s.id, h: hdr, - reader: reader, + data: data, onEachWrite: t.setResetPingStrikes, } - if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil { - _ = reader.Close() + dataLen := data.Len() + if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil { return t.streamContextErr(s) } + data.Ref() if err := t.controlBuf.put(df); err != nil { - _ = reader.Close() + data.Free() return err } t.incrMsgSent() diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index a5f79eb52051..8ffb785c4a46 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -203,10 +203,11 @@ func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *ServerStream) } } data := newBufferSlice(p) + data.Ref() conn.controlBuf.put(&dataFrame{ streamID: s.id, h: nil, - reader: data.Reader(), + data: data, onEachWrite: func() {}, }) sent += len(p) @@ -1092,11 +1093,12 @@ func (s) TestServerContextCanceledOnClosedConnection(t *testing.T) { t.Fatalf("Failed to open stream: %v", err) } d := newBufferSlice(make([]byte, http2MaxFrameLen)) + d.Ref() ct.controlBuf.put(&dataFrame{ streamID: s.id, endStream: false, h: nil, - reader: d.Reader(), + data: d, onEachWrite: func() {}, }) // Loop until the server side stream is created. diff --git a/mem/buffer_slice.go b/mem/buffer_slice.go index 65002e2cc851..e2d82ab977dd 100644 --- a/mem/buffer_slice.go +++ b/mem/buffer_slice.go @@ -137,6 +137,9 @@ type Reader interface { Close() error // Remaining returns the number of unread bytes remaining in the slice. Remaining() int + // Reset frees the currently held buffer slice and starts reading from the + // provided slice. This allows reusing the reader object. + Reset(s BufferSlice) } type sliceReader struct { @@ -150,6 +153,14 @@ func (r *sliceReader) Remaining() int { return r.len } +func (r *sliceReader) Reset(s BufferSlice) { + r.Close() + s.Ref() + r.data = s + r.len = s.Len() + r.bufferIdx = 0 +} + func (r *sliceReader) Close() error { r.data.Free() r.data = nil From e18b02907c0191674fcd97753a8fa2b4bafa7ceb Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Thu, 29 May 2025 00:15:56 +0530 Subject: [PATCH 2/2] Avoid redundant work in reset --- mem/buffer_slice.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mem/buffer_slice.go b/mem/buffer_slice.go index e2d82ab977dd..af510d20c5ab 100644 --- a/mem/buffer_slice.go +++ b/mem/buffer_slice.go @@ -154,7 +154,7 @@ func (r *sliceReader) Remaining() int { } func (r *sliceReader) Reset(s BufferSlice) { - r.Close() + r.data.Free() s.Ref() r.data = s r.len = s.Len()