Skip to content

Commit 2f6e915

Browse files
authored
Add parallel buffered multipart upload (#1745)
Add multipart upload mode that will buffer a number of parts in memory and fill/upload them in parallel. Use `ConcurrentStreamParts` to enable and `NumThreads` to control the number of buffers. `PartSize` will control the size of each buffer.
1 parent 4eab739 commit 2f6e915

File tree

2 files changed

+216
-2
lines changed

2 files changed

+216
-2
lines changed

api-put-object-streaming.go

+207-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"net/url"
2929
"sort"
3030
"strings"
31+
"sync"
3132

3233
"github.com/google/uuid"
3334
"github.com/minio/minio-go/v7/pkg/s3utils"
@@ -44,7 +45,9 @@ import (
4445
func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
4546
reader io.Reader, size int64, opts PutObjectOptions,
4647
) (info UploadInfo, err error) {
47-
if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
48+
if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
49+
info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
50+
} else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
4851
// Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
4952
info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
5053
} else {
@@ -446,6 +449,209 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
446449
return uploadInfo, nil
447450
}
448451

452+
// putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel.
453+
// This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
454+
func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string,
455+
reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
456+
// Input validation.
457+
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
458+
return UploadInfo{}, err
459+
}
460+
461+
if err = s3utils.CheckValidObjectName(objectName); err != nil {
462+
return UploadInfo{}, err
463+
}
464+
465+
if !opts.SendContentMd5 {
466+
if opts.UserMetadata == nil {
467+
opts.UserMetadata = make(map[string]string, 1)
468+
}
469+
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
470+
}
471+
472+
// Cancel all when an error occurs.
473+
ctx, cancel := context.WithCancel(ctx)
474+
defer cancel()
475+
476+
// Calculate the optimal parts info for a given size.
477+
totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
478+
if err != nil {
479+
return UploadInfo{}, err
480+
}
481+
482+
// Initiates a new multipart request
483+
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
484+
if err != nil {
485+
return UploadInfo{}, err
486+
}
487+
delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")
488+
489+
// Aborts the multipart upload if the function returns
490+
// any error, since we do not resume we should purge
491+
// the parts which have been uploaded to relinquish
492+
// storage space.
493+
defer func() {
494+
if err != nil {
495+
c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
496+
}
497+
}()
498+
499+
// Create checksums
500+
// CRC32C is ~50% faster on AMD64 @ 30GB/s
501+
var crcBytes []byte
502+
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
503+
md5Hash := c.md5Hasher()
504+
defer md5Hash.Close()
505+
506+
// Total data read and written to server. should be equal to 'size' at the end of the call.
507+
var totalUploadedSize int64
508+
509+
// Initialize parts uploaded map.
510+
partsInfo := make(map[int]ObjectPart)
511+
512+
// Create a buffer.
513+
nBuffers := int64(opts.NumThreads)
514+
bufs := make(chan []byte, nBuffers)
515+
all := make([]byte, nBuffers*partSize)
516+
for i := int64(0); i < nBuffers; i++ {
517+
bufs <- all[i*partSize : i*partSize+partSize]
518+
}
519+
520+
var wg sync.WaitGroup
521+
var mu sync.Mutex
522+
errCh := make(chan error, opts.NumThreads)
523+
524+
reader = newHook(reader, opts.Progress)
525+
526+
// Part number always starts with '1'.
527+
var partNumber int
528+
for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
529+
// Proceed to upload the part.
530+
var buf []byte
531+
select {
532+
case buf = <-bufs:
533+
case err = <-errCh:
534+
cancel()
535+
wg.Wait()
536+
return UploadInfo{}, err
537+
}
538+
539+
if int64(len(buf)) != partSize {
540+
return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize)
541+
}
542+
543+
length, rerr := readFull(reader, buf)
544+
if rerr == io.EOF && partNumber > 1 {
545+
// Done
546+
break
547+
}
548+
549+
if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
550+
cancel()
551+
wg.Wait()
552+
return UploadInfo{}, rerr
553+
}
554+
555+
// Calculate md5sum.
556+
customHeader := make(http.Header)
557+
if !opts.SendContentMd5 {
558+
// Add CRC32C instead.
559+
crc.Reset()
560+
crc.Write(buf[:length])
561+
cSum := crc.Sum(nil)
562+
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
563+
crcBytes = append(crcBytes, cSum...)
564+
}
565+
566+
wg.Add(1)
567+
go func(partNumber int) {
568+
// Avoid declaring variables in the for loop
569+
var md5Base64 string
570+
571+
if opts.SendContentMd5 {
572+
md5Hash.Reset()
573+
md5Hash.Write(buf[:length])
574+
md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
575+
}
576+
577+
defer wg.Done()
578+
p := uploadPartParams{
579+
bucketName: bucketName,
580+
objectName: objectName,
581+
uploadID: uploadID,
582+
reader: bytes.NewReader(buf[:length]),
583+
partNumber: partNumber,
584+
md5Base64: md5Base64,
585+
size: int64(length),
586+
sse: opts.ServerSideEncryption,
587+
streamSha256: !opts.DisableContentSha256,
588+
customHeader: customHeader,
589+
}
590+
objPart, uerr := c.uploadPart(ctx, p)
591+
if uerr != nil {
592+
errCh <- uerr
593+
}
594+
595+
// Save successfully uploaded part metadata.
596+
mu.Lock()
597+
partsInfo[partNumber] = objPart
598+
mu.Unlock()
599+
600+
// Send buffer back so it can be reused.
601+
bufs <- buf
602+
}(partNumber)
603+
604+
// Save successfully uploaded size.
605+
totalUploadedSize += int64(length)
606+
}
607+
wg.Wait()
608+
609+
// Collect any error
610+
select {
611+
case err = <-errCh:
612+
return UploadInfo{}, err
613+
default:
614+
}
615+
616+
// Complete multipart upload.
617+
var complMultipartUpload completeMultipartUpload
618+
619+
// Loop over total uploaded parts to save them in
620+
// Parts array before completing the multipart request.
621+
for i := 1; i < partNumber; i++ {
622+
part, ok := partsInfo[i]
623+
if !ok {
624+
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
625+
}
626+
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
627+
ETag: part.ETag,
628+
PartNumber: part.PartNumber,
629+
ChecksumCRC32: part.ChecksumCRC32,
630+
ChecksumCRC32C: part.ChecksumCRC32C,
631+
ChecksumSHA1: part.ChecksumSHA1,
632+
ChecksumSHA256: part.ChecksumSHA256,
633+
})
634+
}
635+
636+
// Sort all completed parts.
637+
sort.Sort(completedParts(complMultipartUpload.Parts))
638+
639+
opts = PutObjectOptions{}
640+
if len(crcBytes) > 0 {
641+
// Add hash of hashes.
642+
crc.Reset()
643+
crc.Write(crcBytes)
644+
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
645+
}
646+
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
647+
if err != nil {
648+
return UploadInfo{}, err
649+
}
650+
651+
uploadInfo.Size = totalUploadedSize
652+
return uploadInfo, nil
653+
}
654+
449655
// putObject special function used Google Cloud Storage. This special function
450656
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
451657
func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {

api-put-object.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,12 @@ type PutObjectOptions struct {
8787
SendContentMd5 bool
8888
DisableContentSha256 bool
8989
DisableMultipart bool
90-
Internal AdvancedPutOptions
90+
91+
// ConcurrentStreamParts will create NumThreads buffers of PartSize bytes,
92+
// fill them serially and upload them in parallel.
93+
// This can be used for faster uploads on non-seekable or slow-to-seek input.
94+
ConcurrentStreamParts bool
95+
Internal AdvancedPutOptions
9196
}
9297

9398
// getNumThreads - gets the number of threads to be used in the multipart
@@ -272,6 +277,9 @@ func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName str
272277
if opts.DisableMultipart {
273278
return UploadInfo{}, errors.New("no length provided and multipart disabled")
274279
}
280+
if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
281+
return c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
282+
}
275283
return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
276284
}
277285

0 commit comments

Comments
 (0)