Skip to content

Commit a08471e

Browse files
ajankovicmmatczuk
authored andcommitted
s3: limit upload concurrency
To allow concurrent FS operations, limiter is moved from function to the file system object level so it can limit all concurrency for the fs.
1 parent 2dea91d commit a08471e

File tree

1 file changed

+25
-21
lines changed

1 file changed

+25
-21
lines changed

backend/s3/s3.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -941,18 +941,19 @@ type Options struct {
941941

942942
// Fs represents a remote s3 server
943943
type Fs struct {
944-
name string // the name of the remote
945-
root string // root of the bucket - ignore all objects above this
946-
opt Options // parsed options
947-
features *fs.Features // optional features
948-
c *s3.S3 // the connection to the s3 server
949-
ses *session.Session // the s3 session
950-
rootBucket string // bucket part of root (if any)
951-
rootDirectory string // directory part of root (if any)
952-
cache *bucket.Cache // cache for bucket creation status
953-
pacer *fs.Pacer // To pace the API calls
954-
srv *http.Client // a plain http client
955-
pool *pool.Pool // memory pool
944+
name string // the name of the remote
945+
root string // root of the bucket - ignore all objects above this
946+
opt Options // parsed options
947+
features *fs.Features // optional features
948+
c *s3.S3 // the connection to the s3 server
949+
ses *session.Session // the s3 session
950+
rootBucket string // bucket part of root (if any)
951+
rootDirectory string // directory part of root (if any)
952+
cache *bucket.Cache // cache for bucket creation status
953+
pacer *fs.Pacer // To pace the API calls
954+
srv *http.Client // a plain http client
955+
tokens *pacer.TokenDispenser // upload concurency tokens
956+
pool *pool.Pool // memory pool
956957
}
957958

958959
// Object describes a s3 object
@@ -1238,6 +1239,11 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
12381239
return nil, err
12391240
}
12401241

1242+
concurrency := opt.UploadConcurrency
1243+
if concurrency < 1 {
1244+
concurrency = 1
1245+
}
1246+
12411247
f := &Fs{
12421248
name: name,
12431249
opt: *opt,
@@ -1252,6 +1258,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
12521258
opt.UploadConcurrency*fs.Config.Transfers,
12531259
opt.MemoryPoolUseMmap,
12541260
),
1261+
tokens: pacer.NewTokenDispenser(concurrency),
12551262
}
12561263

12571264
f.setRoot(root)
@@ -2161,13 +2168,6 @@ var warnStreamUpload sync.Once
21612168
func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, size int64, in io.Reader) (err error) {
21622169
f := o.fs
21632170

2164-
// make concurrency machinery
2165-
concurrency := f.opt.UploadConcurrency
2166-
if concurrency < 1 {
2167-
concurrency = 1
2168-
}
2169-
tokens := pacer.NewTokenDispenser(concurrency)
2170-
21712171
// calculate size of parts
21722172
partSize := int(f.opt.ChunkSize)
21732173

@@ -2241,7 +2241,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
22412241

22422242
for partNum := int64(1); !finished; partNum++ {
22432243
// Get a block of memory from the pool and token which limits concurrency.
2244-
tokens.Get()
2244+
o.fs.tokens.Get()
22452245
buf := memPool.Get()
22462246

22472247
// Fail fast, in case an errgroup managed function returns an error
@@ -2289,6 +2289,10 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
22892289
}
22902290
uout, err := f.c.UploadPartWithContext(gCtx, uploadPartReq)
22912291
if err != nil {
2292+
concurrency := f.opt.UploadConcurrency
2293+
if concurrency < 1 {
2294+
concurrency = 1
2295+
}
22922296
if partNum <= int64(concurrency) {
22932297
return f.shouldRetry(err)
22942298
}
@@ -2307,7 +2311,7 @@ func (o *Object) uploadMultipart(ctx context.Context, req *s3.PutObjectInput, si
23072311

23082312
// return the memory and token
23092313
memPool.Put(buf)
2310-
tokens.Put()
2314+
o.fs.tokens.Put()
23112315

23122316
if err != nil {
23132317
return errors.Wrap(err, "multipart upload failed to upload part")

0 commit comments

Comments
 (0)