Skip to content

Commit ad6d0e9

Browse files
authored
Merge pull request #709 from datazip-inc/staging
chore: releasing version v0.3.8
2 parents aab7eb0 + 8d83799 commit ad6d0e9

1 file changed

Lines changed: 6 additions & 3 deletions

File tree

destination/parquet/parquet.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type Parquet struct {
4141
basePath string // construct with streamNamespace/streamName
4242
partitionedFiles map[string]*FileMetadata // mapping of basePath/{regex} -> pqFiles
4343
s3Client *s3.S3
44+
s3Uploader *s3manager.Uploader
4445
schema typeutils.Fields
4546
}
4647

@@ -77,6 +78,8 @@ func (p *Parquet) initS3Writer() error {
7778
return fmt.Errorf("failed to create AWS session: %s", err)
7879
}
7980
p.s3Client = s3.New(sess)
81+
// Initialize uploader for multipart uploads (handles files > 5GB automatically)
82+
p.s3Uploader = s3manager.NewUploader(sess)
8083

8184
return nil
8285
}
@@ -282,14 +285,14 @@ func (p *Parquet) closePqFiles() error {
282285
}
283286
s3KeyPath = filepath.Join(s3KeyPath, parquetFile.fileName)
284287

285-
// Upload to S3
286-
_, err = p.s3Client.PutObject(&s3.PutObjectInput{
288+
// Upload to S3 using multipart upload (automatically handles files > 5GB)
289+
_, err = p.s3Uploader.Upload(&s3manager.UploadInput{
287290
Bucket: aws.String(p.config.Bucket),
288291
Key: aws.String(s3KeyPath),
289292
Body: file,
290293
})
291294
if err != nil {
292-
return fmt.Errorf("failed to put object into s3: %s", err)
295+
return fmt.Errorf("failed to upload object to s3: %s", err)
293296
}
294297

295298
// Remove local file after successful upload

0 commit comments

Comments
 (0)