From f91398eb5feacbdfe4112b9206daf6234e465c31 Mon Sep 17 00:00:00 2001 From: Andrii Dmytrenko Date: Fri, 25 Jul 2025 12:34:29 +0100 Subject: [PATCH] Add Concater extension to GCS storage --- pkg/gcsstore/gcsstore.go | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/pkg/gcsstore/gcsstore.go b/pkg/gcsstore/gcsstore.go index 9dd903ab1..a4b3667b8 100644 --- a/pkg/gcsstore/gcsstore.go +++ b/pkg/gcsstore/gcsstore.go @@ -54,6 +54,7 @@ func New(bucket string, service GCSAPI) GCSStore { func (store GCSStore) UseIn(composer *handler.StoreComposer) { composer.UseCore(store) composer.UseTerminater(store) + composer.UseConcater(store) } func (store GCSStore) NewUpload(ctx context.Context, info handler.FileInfo) (handler.Upload, error) { @@ -88,6 +89,10 @@ func (store GCSStore) AsTerminatableUpload(upload handler.Upload) handler.Termin return upload.(*gcsUpload) } +func (store GCSStore) AsConcatableUpload(upload handler.Upload) handler.ConcatableUpload { + return upload.(*gcsUpload) +} + func (upload gcsUpload) WriteChunk(ctx context.Context, offset int64, src io.Reader) (int64, error) { id := upload.id store := upload.store @@ -337,6 +342,32 @@ func (upload gcsUpload) GetReader(ctx context.Context) (io.ReadCloser, error) { return store.Service.ReadObject(ctx, params) } +func (upload gcsUpload) ConcatUploads(ctx context.Context, partialUploads []handler.Upload) error { + names := make([]string, len(partialUploads)) + store := upload.store + + for i, partialUpload := range partialUploads { + info, err := partialUpload.GetInfo(ctx) + if err != nil { + return err + } + names[i] = store.keyWithPrefix(info.ID) + } + + composeParams := GCSComposeParams{ + Bucket: store.Bucket, + Destination: store.keyWithPrefix(upload.id), + Sources: names, + } + + err := store.Service.ComposeObjects(ctx, composeParams) + if err != nil { + return err + } + + return nil +} + func (store GCSStore) keyWithPrefix(key string) string { prefix := store.ObjectPrefix if prefix != "" && !strings.HasSuffix(prefix, "/") {