Skip to content

Commit 7da6bd3

Browse files
equals215NGTmeaty
andauthored
Use valyala/bytebufferpool for spooledTempFile (#141)
* working * removed old code comments --------- Co-authored-by: Jake L <NGTmeaty@users.noreply.github.com>
1 parent 7bf82e6 commit 7da6bd3

File tree

4 files changed

+25
-78
lines changed

4 files changed

+25
-78
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/spf13/cobra v1.9.1
1313
github.com/things-go/go-socks5 v0.0.6
1414
github.com/ulikunitz/xz v0.5.14
15+
github.com/valyala/bytebufferpool v1.0.0
1516
github.com/zeebo/blake3 v0.2.4
1617
go.uber.org/goleak v1.3.0
1718
golang.org/x/net v0.39.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ github.com/things-go/go-socks5 v0.0.6 h1:YjylIYZiND41szH4NzsVbx8aVDsS/Y8ps3QYPwQ
4040
github.com/things-go/go-socks5 v0.0.6/go.mod h1:RF6tRutwNWzISbPfiDEChH/o1aDfRv+cXDYn2a2qkK4=
4141
github.com/ulikunitz/xz v0.5.14 h1:uv/0Bq533iFdnMHZdRBTOlaNMdb1+ZxXIlHDZHIHcvg=
4242
github.com/ulikunitz/xz v0.5.14/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
43+
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
44+
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
4345
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
4446
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
4547
github.com/zeebo/assert v1.1.0 h1:hU1L1vLTHsnO8x8c9KAR5GmM5QscxHg5RNU5z5qbUWY=

pkg/spooledtempfile/spooled.go

Lines changed: 18 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"os"
1010
"sync"
1111
"time"
12+
13+
"github.com/valyala/bytebufferpool"
1214
)
1315

1416
const (
@@ -30,30 +32,8 @@ type globalMemoryCache struct {
3032

3133
var (
3234
memoryUsageCache = &globalMemoryCache{}
33-
spooledPool = sync.Pool{
34-
New: func() any {
35-
b := make([]byte, 0, InitialBufferSize)
36-
return &b
37-
},
38-
}
3935
)
4036

41-
// Get a zero-length slice backed by a pooled array (cap == InitialBufferSize).
42-
func getPooledBuf() []byte {
43-
p := spooledPool.Get().(*[]byte)
44-
return (*p)[:0]
45-
}
46-
47-
// Return a slice to the pool without retaining large arrays.
48-
// IMPORTANT: do NOT pass &s.buf directly; copy a header first.
49-
func putPooledBuf(b []byte) {
50-
if cap(b) == 0 || cap(b) > InitialBufferSize {
51-
return // don't retain big arrays
52-
}
53-
h := b[:0] // fresh header decoupled from callers
54-
spooledPool.Put(&h) // pointer-like -> SA6002 satisfied
55-
}
56-
5737
// ReaderAt is the interface for ReadAt - read at position, without moving pointer.
5838
type ReaderAt interface {
5939
ReadAt(p []byte, off int64) (n int, err error)
@@ -70,7 +50,7 @@ type ReadSeekCloser interface {
7050
// spooledTempFile writes to memory (or to disk if
7151
// over MaxInMemorySize) and deletes the file on Close
7252
type spooledTempFile struct {
73-
buf []byte // Use []byte instead of bytes.Buffer
53+
buf *bytebufferpool.ByteBuffer
7454
mem *bytes.Reader // Reader for in-memory data
7555
file *os.File
7656
filePrefix string
@@ -118,7 +98,7 @@ func NewSpooledTempFile(filePrefix string, tempDir string, threshold int, fullOn
11898
return &spooledTempFile{
11999
filePrefix: filePrefix,
120100
tempDir: tempDir,
121-
buf: getPooledBuf(),
101+
buf: bytebufferpool.Get(),
122102
maxInMemorySize: threshold,
123103
fullOnDisk: fullOnDisk,
124104
maxRAMUsageFraction: maxRAMUsageFraction,
@@ -142,7 +122,7 @@ func (s *spooledTempFile) prepareRead() error {
142122
return nil
143123
}
144124

145-
s.mem = bytes.NewReader(s.buf) // Create a reader from the []byte slice
125+
s.mem = bytes.NewReader(s.buf.Bytes())
146126
return nil
147127
}
148128

@@ -154,7 +134,7 @@ func (s *spooledTempFile) Len() int {
154134
}
155135
return int(fi.Size())
156136
}
157-
return len(s.buf) // Return the length of the []byte slice
137+
return s.buf.Len()
158138
}
159139

160140
func (s *spooledTempFile) Read(p []byte) (n int, err error) {
@@ -196,6 +176,7 @@ func (s *spooledTempFile) Write(p []byte) (n int, err error) {
196176
if s.closed {
197177
return 0, io.EOF
198178
}
179+
199180
if s.reading {
200181
panic("write after read")
201182
}
@@ -205,27 +186,26 @@ func (s *spooledTempFile) Write(p []byte) (n int, err error) {
205186
}
206187

207188
aboveRAMThreshold := s.isSystemMemoryUsageHigh()
208-
if aboveRAMThreshold || s.fullOnDisk || (len(s.buf)+len(p) > s.maxInMemorySize) {
189+
if aboveRAMThreshold || s.fullOnDisk || (s.buf.Len()+len(p) > s.maxInMemorySize) {
209190
// Switch to file if we haven't already
210191
s.file, err = os.CreateTemp(s.tempDir, s.filePrefix+"-")
211192
if err != nil {
212193
return 0, err
213194
}
214195

215196
// Copy what we already had in the buffer
216-
_, err = s.file.Write(s.buf)
197+
_, err = s.buf.WriteTo(s.file)
217198
if err != nil {
218199
s.file.Close()
219200
s.file = nil
220201
return 0, err
221202
}
222203

223204
// Release the buffer back to the pool
224-
if s.buf != nil && cap(s.buf) <= InitialBufferSize && cap(s.buf) > 0 {
225-
putPooledBuf(s.buf)
205+
if s.buf != nil {
206+
bytebufferpool.Put(s.buf)
226207
}
227208
s.buf = nil
228-
s.mem = nil // Discard the bytes.Reader
229209

230210
// Write incoming bytes directly to file
231211
n, err = s.file.Write(p)
@@ -237,36 +217,22 @@ func (s *spooledTempFile) Write(p []byte) (n int, err error) {
237217
return n, nil
238218
}
239219

240-
// Grow the buffer if necessary, but never exceed MaxInMemorySize
241-
if len(s.buf)+len(p) > cap(s.buf) {
242-
newCap := min(len(s.buf)+len(p), s.maxInMemorySize)
243-
244-
// Allocate a new buffer with the increased capacity
245-
newBuf := make([]byte, len(s.buf), newCap)
246-
copy(newBuf, s.buf)
247-
248-
// Release the old buffer to the pool
249-
if s.buf != nil && cap(s.buf) <= InitialBufferSize && cap(s.buf) > 0 {
250-
putPooledBuf(s.buf)
251-
}
252-
s.buf = newBuf
253-
s.mem = nil // Discard the old bytes.Reader
254-
}
255-
256220
// Append data to the buffer
257-
s.buf = append(s.buf, p...)
221+
s.buf.Write(p)
258222
return len(p), nil
259223
}
260224

261225
func (s *spooledTempFile) Close() error {
262226
s.closed = true
263-
s.mem = nil
227+
228+
if s.mem != nil {
229+
s.mem.Reset([]byte{})
230+
s.mem = nil
231+
}
264232

265233
// Release the buffer back to the pool
266234
if s.buf != nil {
267-
if cap(s.buf) <= InitialBufferSize && cap(s.buf) > 0 {
268-
putPooledBuf(s.buf)
269-
}
235+
bytebufferpool.Put(s.buf)
270236
s.buf = nil
271237
}
272238

pkg/spooledtempfile/spooled_test.go

Lines changed: 4 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -461,8 +461,8 @@ func TestBufferGrowthWithinLimits(t *testing.T) {
461461

462462
// Check that the buffer grew
463463
spoolBuffer := spool.(*spooledTempFile)
464-
if cap(spoolBuffer.buf) <= InitialBufferSize {
465-
t.Fatalf("Expected buffer capacity > %d, got %d", InitialBufferSize, cap(spoolBuffer.buf))
464+
if spoolBuffer.buf.Len() <= InitialBufferSize {
465+
t.Fatalf("Expected buffer capacity > %d, got %d", InitialBufferSize, spoolBuffer.buf.Len())
466466
}
467467

468468
// Check that the buffer is still in memory and has grown
@@ -492,28 +492,15 @@ func TestPoolBehavior(t *testing.T) {
492492

493493
// Ensure the buffer has grown beyond InitialBufferSize
494494
spoolTempFile := spool.(*spooledTempFile)
495-
if cap(spoolTempFile.buf) <= InitialBufferSize {
496-
t.Fatalf("Expected buffer capacity > %d, got %d", InitialBufferSize, cap(spoolTempFile.buf))
495+
if spoolTempFile.buf.Len() <= InitialBufferSize {
496+
t.Fatalf("Expected buffer capacity > %d, got %d", InitialBufferSize, spoolTempFile.buf.Len())
497497
}
498498

499499
// Close the spool to release the buffer
500500
err = spool.Close()
501501
if err != nil {
502502
t.Fatalf("Close error: %v", err)
503503
}
504-
505-
// Retrieve a buffer from the pool
506-
buf := getPooledBuf()
507-
508-
// Verify that the retrieved buffer has the expected initial capacity
509-
if cap(buf) != InitialBufferSize {
510-
t.Errorf("Expected buffer in pool to have capacity %d, got %d", InitialBufferSize, cap(buf))
511-
}
512-
513-
// Verify that the buffer is empty (reset)
514-
if len(buf) != 0 {
515-
t.Errorf("Expected buffer length to be 0, got %d", len(buf))
516-
}
517504
}
518505

519506
func TestBufferGrowthBeyondNewCap(t *testing.T) {
@@ -557,15 +544,6 @@ func TestBufferGrowthBeyondNewCap(t *testing.T) {
557544
if !bytes.Equal(out, expected) {
558545
t.Errorf("Data mismatch. Got %q, want %q", out, expected)
559546
}
560-
561-
// Verify that the buffer was released to the pool (if it meets the criteria)
562-
buf := getPooledBuf()
563-
if cap(buf) != InitialBufferSize {
564-
t.Errorf("Expected buffer in pool to have capacity %d, got %d", InitialBufferSize, cap(buf))
565-
}
566-
if len(buf) != 0 {
567-
t.Errorf("Expected buffer length to be 0, got %d", len(buf))
568-
}
569547
}
570548

571549
func TestSpoolingWhenIOCopy(t *testing.T) {

0 commit comments

Comments
 (0)