Skip to content

Commit 117d8d9

Browse files
committed
pool: fix deadlock with --max-memory and multipart transfers
Because multipart transfers can need more than one buffer to complete, if transfers was set very high, it was possible for lots of multipart transfers to start, grab fewer buffers than chunk size, then deadlock because no more memory was available. This fixes the problem by introducing a reservation system which the multipart transfer uses to ensure it can reserve all the memory for one chunk before starting.
1 parent 5050f42 commit 117d8d9

4 files changed

Lines changed: 78 additions & 8 deletions

File tree

fs/operations/multithread.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (mc *multiThreadCopyState) copyChunk(ctx context.Context, chunk int, writer
9292
rs = rc
9393
} else {
9494
// Read the chunk into buffered reader
95-
rw := multipart.NewRW()
95+
rw := multipart.NewRW().Reserve(size)
9696
defer fs.CheckClose(rw, &err)
9797
_, err = io.CopyN(rw, rc, size)
9898
if err != nil {

lib/multipart/multipart.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func UploadMultipart(ctx context.Context, src fs.ObjectInfo, in io.Reader, opt U
7373
for partNum := int64(0); !finished; partNum++ {
7474
// Get a block of memory from the pool and token which limits concurrency.
7575
tokens.Get()
76-
rw := NewRW()
76+
rw := NewRW().Reserve(chunkSize)
7777
if acc != nil {
7878
rw.SetAccounting(acc.AccountRead)
7979
}

lib/pool/pool.go

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -192,27 +192,62 @@ func (bp *Pool) release(mem int64) {
192192
totalMemory.Release(mem)
193193
}
194194

195+
// Reserve buffers for use. Blocks until they are free.
196+
//
197+
// Doesn't allocate any memory.
198+
//
199+
// Must be released by calling GetReserved() which releases 1 buffer or
200+
// Release() to release any number of buffers.
201+
func (bp *Pool) Reserve(buffers int) {
202+
waitTime := time.Millisecond
203+
for {
204+
err := bp.acquire(int64(buffers) * int64(bp.bufferSize))
205+
if err == nil {
206+
break
207+
}
208+
fs.Logf(nil, "Failed to get reservation for buffer, waiting for %v: %v", waitTime, err)
209+
time.Sleep(waitTime)
210+
waitTime *= 2
211+
}
212+
}
213+
214+
// Release previously Reserved buffers.
215+
//
216+
// Doesn't free any memory.
217+
func (bp *Pool) Release(buffers int) {
218+
bp.release(int64(buffers) * int64(bp.bufferSize))
219+
}
220+
195221
// Get a buffer from the pool or allocate one
196-
func (bp *Pool) Get() []byte {
222+
func (bp *Pool) getBlock(reserved bool) []byte {
197223
bp.mu.Lock()
198224
var buf []byte
199225
waitTime := time.Millisecond
200226
for {
201227
if len(bp.cache) > 0 {
202228
buf = bp.get()
229+
if reserved {
230+
// If got reserved memory from the cache we
231+
// can release the reservation of one buffer.
232+
bp.release(int64(bp.bufferSize))
233+
}
203234
break
204235
} else {
205236
var err error
206-
bp.mu.Unlock()
207-
err = bp.acquire(int64(bp.bufferSize))
208-
bp.mu.Lock()
237+
if !reserved {
238+
bp.mu.Unlock()
239+
err = bp.acquire(int64(bp.bufferSize))
240+
bp.mu.Lock()
241+
}
209242
if err == nil {
210243
buf, err = bp.alloc(bp.bufferSize)
211244
if err == nil {
212245
bp.alloced++
213246
break
214247
}
215-
bp.release(int64(bp.bufferSize))
248+
if !reserved {
249+
bp.release(int64(bp.bufferSize))
250+
}
216251
}
217252
fs.Logf(nil, "Failed to get memory for buffer, waiting for %v: %v", waitTime, err)
218253
bp.mu.Unlock()
@@ -227,6 +262,16 @@ func (bp *Pool) Get() []byte {
227262
return buf
228263
}
229264

265+
// Get a buffer from the pool or allocate one
266+
func (bp *Pool) Get() []byte {
267+
return bp.getBlock(false)
268+
}
269+
270+
// GetReserved gets a reserved buffer from the pool or allocates one.
271+
func (bp *Pool) GetReserved() []byte {
272+
return bp.getBlock(true)
273+
}
274+
230275
// freeBuffer returns mem to the os if required - call with lock held
231276
func (bp *Pool) freeBuffer(mem []byte) {
232277
err := bp.free(mem)

lib/pool/reader_writer.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ type RW struct {
3535
// Read side Variables
3636
out int // offset we are reading from
3737
reads int // count how many times the data has been read
38+
39+
reserved int // number of buffers reserved
3840
}
3941

4042
var (
@@ -59,6 +61,20 @@ func NewRW(pool *Pool) *RW {
5961
return rw
6062
}
6163

64+
// Reserve bytes of memory.
65+
//
66+
// Reserve, but don't allocate n bytes of memory.
67+
//
68+
// This is rounded up to the nearest buffer page size.
69+
func (rw *RW) Reserve(n int64) *RW {
70+
rw.mu.Lock()
71+
defer rw.mu.Unlock()
72+
buffers := int((n + int64(rw.pool.bufferSize) - 1) / int64(rw.pool.bufferSize))
73+
rw.pool.Reserve(buffers)
74+
rw.reserved += buffers
75+
return rw
76+
}
77+
6278
// SetAccounting should be provided with a function which will be
6379
// called after every read from the RW.
6480
//
@@ -200,7 +216,12 @@ func (rw *RW) writePage() (page []byte) {
200216
if len(rw.pages) > 0 && rw.lastOffset < rw.pool.bufferSize {
201217
return rw.pages[len(rw.pages)-1][rw.lastOffset:]
202218
}
203-
page = rw.pool.Get()
219+
if rw.reserved > 0 {
220+
page = rw.pool.GetReserved()
221+
rw.reserved--
222+
} else {
223+
page = rw.pool.Get()
224+
}
204225
rw.pages = append(rw.pages, page)
205226
rw.lastOffset = 0
206227
return page
@@ -321,6 +342,10 @@ func (rw *RW) Close() error {
321342
rw.pool.Put(page)
322343
}
323344
rw.pages = nil
345+
if rw.reserved > 0 {
346+
rw.pool.Release(rw.reserved)
347+
rw.reserved = 0
348+
}
324349
return nil
325350
}
326351

0 commit comments

Comments
 (0)