diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go
index ef72fbb3a016..ca3d72faff4f 100644
--- a/internal/transport/controlbuf.go
+++ b/internal/transport/controlbuf.go
@@ -146,10 +146,11 @@ type earlyAbortStream struct {
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
type dataFrame struct {
- streamID uint32
- endStream bool
- h []byte
- reader mem.Reader
+ streamID uint32
+ endStream bool
+ h []byte
+ data mem.BufferSlice
+ processing bool
// onEachWrite is called every time
// a part of data is written out.
onEachWrite func()
@@ -234,6 +235,7 @@ type outStream struct {
itl *itemList
bytesOutStanding int
wq *writeQuota
+ reader mem.Reader
next *outStream
prev *outStream
@@ -461,7 +463,9 @@ func (c *controlBuffer) finish() {
v.onOrphaned(ErrConnClosing)
}
case *dataFrame:
- _ = v.reader.Close()
+ if !v.processing {
+ v.data.Free()
+ }
}
}
@@ -650,10 +654,11 @@ func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
func (l *loopyWriter) registerStreamHandler(h *registerStream) {
str := &outStream{
- id: h.streamID,
- state: empty,
- itl: &itemList{},
- wq: h.wq,
+ id: h.streamID,
+ state: empty,
+ itl: &itemList{},
+ wq: h.wq,
+ reader: mem.BufferSlice{}.Reader(),
}
l.estdStreams[h.streamID] = str
}
@@ -685,10 +690,11 @@ func (l *loopyWriter) headerHandler(h *headerFrame) error {
}
// Case 2: Client wants to originate stream.
str := &outStream{
- id: h.streamID,
- state: empty,
- itl: &itemList{},
- wq: h.wq,
+ id: h.streamID,
+ state: empty,
+ itl: &itemList{},
+ wq: h.wq,
+ reader: mem.BufferSlice{}.Reader(),
}
return l.originateStream(str, h)
}
@@ -790,10 +796,13 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
// a RST_STREAM before stream initialization thus the stream might
// not be established yet.
delete(l.estdStreams, c.streamID)
+ str.reader.Close()
str.deleteSelf()
for head := str.itl.dequeueAll(); head != nil; head = head.next {
if df, ok := head.it.(*dataFrame); ok {
- _ = df.reader.Close()
+ if !df.processing {
+ df.data.Free()
+ }
}
}
}
@@ -928,7 +937,13 @@ func (l *loopyWriter) processData() (bool, error) {
if str == nil {
return true, nil
}
+ reader := str.reader
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
+ if !dataItem.processing {
+ dataItem.processing = true
+ str.reader.Reset(dataItem.data)
+ dataItem.data.Free()
+ }
// A data item is represented by a dataFrame, since it later translates into
// multiple HTTP2 data frames.
// Every dataFrame has two buffers; h that keeps grpc-message header and data
@@ -936,13 +951,13 @@ func (l *loopyWriter) processData() (bool, error) {
// from data is copied to h to make as big as the maximum possible HTTP2 frame
// size.
- if len(dataItem.h) == 0 && dataItem.reader.Remaining() == 0 { // Empty data frame
+ if len(dataItem.h) == 0 && reader.Remaining() == 0 { // Empty data frame
// Client sends out empty data frame with endStream = true
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
return false, err
}
str.itl.dequeue() // remove the empty data item from stream
- _ = dataItem.reader.Close()
+ _ = reader.Close()
if str.itl.isEmpty() {
str.state = empty
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
@@ -971,8 +986,8 @@ func (l *loopyWriter) processData() (bool, error) {
}
// Compute how much of the header and data we can send within quota and max frame length
hSize := min(maxSize, len(dataItem.h))
- dSize := min(maxSize-hSize, dataItem.reader.Remaining())
- remainingBytes := len(dataItem.h) + dataItem.reader.Remaining() - hSize - dSize
+ dSize := min(maxSize-hSize, reader.Remaining())
+ remainingBytes := len(dataItem.h) + reader.Remaining() - hSize - dSize
size := hSize + dSize
var buf *[]byte
@@ -993,7 +1008,7 @@ func (l *loopyWriter) processData() (bool, error) {
defer pool.Put(buf)
copy((*buf)[:hSize], dataItem.h)
- _, _ = dataItem.reader.Read((*buf)[hSize:])
+ _, _ = reader.Read((*buf)[hSize:])
}
// Now that outgoing flow controls are checked we can replenish str's write quota
@@ -1014,7 +1029,7 @@ func (l *loopyWriter) processData() (bool, error) {
dataItem.h = dataItem.h[hSize:]
if remainingBytes == 0 { // All the data from that message was written out.
- _ = dataItem.reader.Close()
+ _ = reader.Close()
str.itl.dequeue()
}
if str.itl.isEmpty() {
diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go
index ef56592b944e..1b1847252885 100644
--- a/internal/transport/http2_client.go
+++ b/internal/transport/http2_client.go
@@ -1091,32 +1091,29 @@ func (t *http2Client) GracefulClose() {
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
// should proceed only if Write returns nil.
func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
- reader := data.Reader()
-
if opts.Last {
// If it's the last message, update stream state.
if !s.compareAndSwapState(streamActive, streamWriteDone) {
- _ = reader.Close()
return errStreamDone
}
} else if s.getState() != streamActive {
- _ = reader.Close()
return errStreamDone
}
df := &dataFrame{
streamID: s.id,
endStream: opts.Last,
h: hdr,
- reader: reader,
+ data: data,
}
- if hdr != nil || df.reader.Remaining() != 0 { // If it's not an empty data frame, check quota.
- if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
- _ = reader.Close()
+ dataLen := data.Len()
+ if hdr != nil || dataLen != 0 { // If it's not an empty data frame, check quota.
+ if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
return err
}
}
+ data.Ref()
if err := t.controlBuf.put(df); err != nil {
- _ = reader.Close()
+ data.Free()
return err
}
t.incrMsgSent()
diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go
index e4c3731bdb0a..e2c52179d95a 100644
--- a/internal/transport/http2_server.go
+++ b/internal/transport/http2_server.go
@@ -1132,17 +1132,13 @@ func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
// is returns if it fails (e.g., framing error, transport error).
func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
- reader := data.Reader()
-
if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.writeHeader(s, nil); err != nil {
- _ = reader.Close()
return err
}
} else {
// Writing headers checks for this condition.
if s.getState() == streamDone {
- _ = reader.Close()
return t.streamContextErr(s)
}
}
@@ -1150,15 +1146,16 @@ func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _
df := &dataFrame{
streamID: s.id,
h: hdr,
- reader: reader,
+ data: data,
onEachWrite: t.setResetPingStrikes,
}
- if err := s.wq.get(int32(len(hdr) + df.reader.Remaining())); err != nil {
- _ = reader.Close()
+ dataLen := data.Len()
+ if err := s.wq.get(int32(len(hdr) + dataLen)); err != nil {
return t.streamContextErr(s)
}
+ data.Ref()
if err := t.controlBuf.put(df); err != nil {
- _ = reader.Close()
+ data.Free()
return err
}
t.incrMsgSent()
diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go
index a5f79eb52051..8ffb785c4a46 100644
--- a/internal/transport/transport_test.go
+++ b/internal/transport/transport_test.go
@@ -203,10 +203,11 @@ func (h *testStreamHandler) handleStreamMisbehave(t *testing.T, s *ServerStream)
}
}
data := newBufferSlice(p)
+ data.Ref()
conn.controlBuf.put(&dataFrame{
streamID: s.id,
h: nil,
- reader: data.Reader(),
+ data: data,
onEachWrite: func() {},
})
sent += len(p)
@@ -1092,11 +1093,12 @@ func (s) TestServerContextCanceledOnClosedConnection(t *testing.T) {
t.Fatalf("Failed to open stream: %v", err)
}
d := newBufferSlice(make([]byte, http2MaxFrameLen))
+ d.Ref()
ct.controlBuf.put(&dataFrame{
streamID: s.id,
endStream: false,
h: nil,
- reader: d.Reader(),
+ data: d,
onEachWrite: func() {},
})
// Loop until the server side stream is created.
diff --git a/mem/buffer_slice.go b/mem/buffer_slice.go
index 65002e2cc851..af510d20c5ab 100644
--- a/mem/buffer_slice.go
+++ b/mem/buffer_slice.go
@@ -137,6 +137,9 @@ type Reader interface {
Close() error
// Remaining returns the number of unread bytes remaining in the slice.
Remaining() int
+ // Reset frees the currently held buffer slice and starts reading from the
+ // provided slice. This allows reusing the reader object.
+ Reset(s BufferSlice)
}
type sliceReader struct {
@@ -150,6 +153,14 @@ func (r *sliceReader) Remaining() int {
return r.len
}
+func (r *sliceReader) Reset(s BufferSlice) {
+ r.data.Free()
+ s.Ref()
+ r.data = s
+ r.len = s.Len()
+ r.bufferIdx = 0
+}
+
func (r *sliceReader) Close() error {
r.data.Free()
r.data = nil