Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions mem/buffer_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,18 @@ func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) {
_, err := wt.WriteTo(w)
return result, err
}

if lr, ok := r.(*io.LimitedReader); ok {
if wt, ok := lr.R.(io.WriterTo); ok {
// This is more optimal since wt knows the size of chunks it wants to
// write and, hence, we can allocate buffers of an optimal size to fit
// them. E.g. might be a single big chunk, and we wouldn't chop it
// into pieces.
w := NewWriter(&result, pool)
_, err := wt.WriteTo(w)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would circumvent the limit of the LimitedReader, wouldn't it? By directly accessing the underlying reader we are bypassing the limiter.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah but I'm not sure how to do a "LimitedWriter" if we could say it that way. Should we add something like https://github.com/nanmu42/limitio/blob/master/limitio.go to grpc-go code (as a library or our own impl)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @GiedriusS, one solution I can think of is to create your own wrapper struct that wraps a io.LimitReader and also implements io.WriteTo. This would allow your reader to control the size of the temporary buffer being used. Here's a example implementation could work.

type LimitWriterTo struct {
    Reader io.Reader // The underlying io.LimitReader
}

func (l *LimitWriterTo) WriteTo(w io.Writer) (n int64, err error) {
    // Define your custom buffer size here (e.g., 64K, 128K)
    buffer := make([]byte, 1024) // You could get this from a buffer pool.
    
    // Use io.CopyBuffer internally with your custom buffer
    // or implement the read/write loop manually for ultimate control.
    return io.CopyBuffer(w, l.Reader, buffer) 
}

Copy link
Author

@GiedriusS GiedriusS Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The core issue is that grpc-go does an assertion (and it wraps io.Reader inside of a io.LimitedReader itself) whether it's a io.Reader and io.LimitedReader is not a io.Reader so I think this path would never be hit.

Copy link
Contributor

@arjan-bal arjan-bal Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I overlooked the line in the PR description: "This happens if some max message size is set."

gRPC controls the the reader type and not external code. Let me think about it a little more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we set the size of the copy buffer in the for loop below as min(readAllBufSize, LimitReader.N)?

Copy link
Author

@GiedriusS GiedriusS Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is quite huge

defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
so it wouldn't help in our case because the messages are really small (<1KB). Would you accept adding a LimitedWriter to grpc-go code?

Copy link
Contributor

@arjan-bal arjan-bal Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Can you describe how you plan to use the new LimitedReader type to workaround this?

I have one approach in mind which involves having the LimitedReader conditionally implement io.WriterTo using a type assertion if the wrapped reader also implements io.WriterTo. I wanted to know if there's a simpler solution for this.

return result, err
}
}
nextBuffer:
for {
buf := pool.Get(readAllBufSize)
Expand Down
Loading