diff --git a/cmd/tusd/cli/composer.go b/cmd/tusd/cli/composer.go index dd20bc1f1..0e8923733 100644 --- a/cmd/tusd/cli/composer.go +++ b/cmd/tusd/cli/composer.go @@ -13,8 +13,10 @@ import ( "github.com/tus/tusd/v2/pkg/azurestore" "github.com/tus/tusd/v2/pkg/filelocker" "github.com/tus/tusd/v2/pkg/filestore" + "github.com/tus/tusd/v2/pkg/fileidempotencystore" "github.com/tus/tusd/v2/pkg/gcsstore" "github.com/tus/tusd/v2/pkg/handler" + "github.com/tus/tusd/v2/pkg/memoryidempotencystore" "github.com/tus/tusd/v2/pkg/memorylocker" "github.com/tus/tusd/v2/pkg/s3store" @@ -83,6 +85,11 @@ func CreateComposer() { // Attach the metrics from S3 store to the global Prometheus registry store.RegisterMetrics(prometheus.DefaultRegisterer) + + if !Flags.DisableIdempotency { + idempotencyStore := memoryidempotencystore.New() + idempotencyStore.UseIn(Composer) + } } else if Flags.GCSBucket != "" { if Flags.GCSObjectPrefix != "" && strings.Contains(Flags.GCSObjectPrefix, "_") { stderr.Fatalf("gcs-object-prefix value (%s) can't contain underscore. "+ @@ -113,6 +120,11 @@ func CreateComposer() { locker := memorylocker.New() locker.UseIn(Composer) + + if !Flags.DisableIdempotency { + idempotencyStore := memoryidempotencystore.New() + idempotencyStore.UseIn(Composer) + } } else if Flags.AzStorage != "" { accountName := os.Getenv("AZURE_STORAGE_ACCOUNT") @@ -156,6 +168,11 @@ func CreateComposer() { locker := memorylocker.New() locker.UseIn(Composer) + + if !Flags.DisableIdempotency { + idempotencyStore := memoryidempotencystore.New() + idempotencyStore.UseIn(Composer) + } } else { dir, err := filepath.Abs(Flags.UploadDir) if err != nil { @@ -177,6 +194,13 @@ func CreateComposer() { locker.AcquirerPollInterval = Flags.FilelockAcquirerPollInterval locker.HolderPollInterval = Flags.FilelockHolderPollInterval locker.UseIn(Composer) + + if !Flags.DisableIdempotency { + idempotencyStore := fileidempotencystore.New(dir) + idempotencyStore.DirModePerm = os.FileMode(Flags.DirPerms) & os.ModePerm + idempotencyStore.FileModePerm = os.FileMode(Flags.FilePerms) & os.ModePerm + idempotencyStore.UseIn(Composer) + } } printStartupLog("Using %.2fMB as maximum size.\n", float64(Flags.MaxSize)/1024/1024) diff --git a/cmd/tusd/cli/flags.go b/cmd/tusd/cli/flags.go index d2733067f..28b1b0a7c 100644 --- a/cmd/tusd/cli/flags.go +++ b/cmd/tusd/cli/flags.go @@ -94,6 +94,7 @@ var Flags struct { DirPerms uint32 GracefulRequestCompletionTimeout time.Duration ExperimentalProtocol bool + DisableIdempotency bool } type ChmodPermsValue struct { @@ -142,6 +143,7 @@ func ParseFlags() { f.BoolVar(&Flags.DisableDownload, "disable-download", false, "Disable the download endpoint") f.BoolVar(&Flags.DisableTermination, "disable-termination", false, "Disable the termination endpoint") f.BoolVar(&Flags.DisableConcatenation, "disable-concatenation", false, "Disable support for the concatenation extension") + f.BoolVar(&Flags.DisableIdempotency, "disable-idempotency", false, "Disable support for the Idempotency-Key header to detect retried upload creation requests") f.Int64Var(&Flags.MaxSize, "max-size", 0, "Maximum size of a single upload in bytes") }) diff --git a/pkg/fileidempotencystore/fileidempotencystore.go b/pkg/fileidempotencystore/fileidempotencystore.go new file mode 100644 index 000000000..9a9bf97d3 --- /dev/null +++ b/pkg/fileidempotencystore/fileidempotencystore.go @@ -0,0 +1,124 @@ +// Package fileidempotencystore provides a disk-backed IdempotencyKeyStore. +// +// It persists idempotency key to upload ID mappings as small JSON files in a +// configurable directory (typically the same directory used for upload data). +// Each mapping is stored in a file named {sha256(key)}.idempotency-key. The +// SHA-256 hash ensures filenames are safe for any filesystem, and the full +// original key is stored inside the file to guard against hash collisions. +package fileidempotencystore + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "io/fs" + "os" + "path/filepath" + + "github.com/tus/tusd/v2/pkg/handler" +) + +type FileIdempotencyStore struct { + // Path is the directory in which .idempotency-key files are stored. + Path string + + // DirModePerm is the permission bits used when creating directories. + // If zero, defaults to 0775. + DirModePerm fs.FileMode + + // FileModePerm is the permission bits used when creating files. + // If zero, defaults to 0664. + FileModePerm fs.FileMode +} + +// New creates a new file-based idempotency key store. The directory specified +// will be used to read and write .idempotency-key files. This method does not +// check whether the path exists; use os.MkdirAll to ensure it does. +func New(path string) *FileIdempotencyStore { + return &FileIdempotencyStore{Path: path} +} + +// UseIn adds this store to the passed composer. +func (s *FileIdempotencyStore) UseIn(composer *handler.StoreComposer) { + composer.UseIdempotencyKeyStore(s) +} + +type keyMapping struct { + Key string `json:"key"` + UploadID string `json:"upload_id"` +} + +func (s *FileIdempotencyStore) FindUploadID(ctx context.Context, key string) (string, error) { + path := s.filePath(key) + + data, err := os.ReadFile(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return "", handler.ErrNotFound + } + return "", err + } + + var mapping keyMapping + if err := json.Unmarshal(data, &mapping); err != nil { + // File is corrupted (e.g. from a crash during write). Treat as + // missing so the handler falls through to create a new upload, + // which will overwrite this file via StoreUploadID. + return "", handler.ErrNotFound + } + + if mapping.Key != key { + // Hash collision: the stored key doesn't match the requested key. + return "", handler.ErrNotFound + } + + return mapping.UploadID, nil +} + +func (s *FileIdempotencyStore) StoreUploadID(ctx context.Context, key string, uploadID string) error { + mapping := keyMapping{ + Key: key, + UploadID: uploadID, + } + + data, err := json.Marshal(mapping) + if err != nil { + return err + } + + path := s.filePath(key) + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, s.dirPerm()); err != nil { + return err + } + + // Write to a temp file first, then rename atomically to prevent + // corrupted mapping files if the process crashes mid-write. + tmp := path + ".tmp" + if err := os.WriteFile(tmp, data, s.filePerm()); err != nil { + return err + } + return os.Rename(tmp, path) +} + +func (s *FileIdempotencyStore) filePath(key string) string { + hash := sha256.Sum256([]byte(key)) + name := hex.EncodeToString(hash[:]) + ".idempotency-key" + return filepath.Join(s.Path, name) +} + +func (s *FileIdempotencyStore) dirPerm() fs.FileMode { + if s.DirModePerm == 0 { + return 0775 + } + return s.DirModePerm +} + +func (s *FileIdempotencyStore) filePerm() fs.FileMode { + if s.FileModePerm == 0 { + return 0664 + } + return s.FileModePerm +} diff --git a/pkg/fileidempotencystore/fileidempotencystore_test.go b/pkg/fileidempotencystore/fileidempotencystore_test.go new file mode 100644 index 000000000..63882a884 --- /dev/null +++ b/pkg/fileidempotencystore/fileidempotencystore_test.go @@ -0,0 +1,75 @@ +package fileidempotencystore + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/tus/tusd/v2/pkg/handler" +) + +func TestFileIdempotencyStore(t *testing.T) { + dir, err := os.MkdirTemp("", "fileidempotencystore-test-*") + assert.NoError(t, err) + defer os.RemoveAll(dir) + + store := New(dir) + + ctx := context.Background() + + t.Run("FindReturnsErrNotFoundForMissingKey", func(t *testing.T) { + _, err := store.FindUploadID(ctx, "nonexistent-key") + assert.ErrorIs(t, err, handler.ErrNotFound) + }) + + t.Run("StoreAndFind", func(t *testing.T) { + err := store.StoreUploadID(ctx, "my-idempotency-key", "upload-123") + assert.NoError(t, err) + + uploadID, err := store.FindUploadID(ctx, "my-idempotency-key") + assert.NoError(t, err) + assert.Equal(t, "upload-123", uploadID) + }) + + t.Run("DifferentKeysAreSeparate", func(t *testing.T) { + err := store.StoreUploadID(ctx, "key-a", "upload-a") + assert.NoError(t, err) + + err = store.StoreUploadID(ctx, "key-b", "upload-b") + assert.NoError(t, err) + + id, err := store.FindUploadID(ctx, "key-a") + assert.NoError(t, err) + assert.Equal(t, "upload-a", id) + + id, err = store.FindUploadID(ctx, "key-b") + assert.NoError(t, err) + assert.Equal(t, "upload-b", id) + }) + + t.Run("OverwritesExistingMapping", func(t *testing.T) { + err := store.StoreUploadID(ctx, "overwrite-key", "first-id") + assert.NoError(t, err) + + err = store.StoreUploadID(ctx, "overwrite-key", "second-id") + assert.NoError(t, err) + + id, err := store.FindUploadID(ctx, "overwrite-key") + assert.NoError(t, err) + assert.Equal(t, "second-id", id) + }) + + t.Run("CorruptedFileReturnErrNotFound", func(t *testing.T) { + // Write garbage to simulate a crash during write. + err := store.StoreUploadID(ctx, "corrupt-key", "good-id") + assert.NoError(t, err) + + // Overwrite the file with invalid JSON. + path := store.filePath("corrupt-key") + assert.NoError(t, os.WriteFile(path, []byte("not json"), 0664)) + + _, err = store.FindUploadID(ctx, "corrupt-key") + assert.ErrorIs(t, err, handler.ErrNotFound) + }) +} diff --git a/pkg/handler/composer.go b/pkg/handler/composer.go index 437549fbe..79f651f4b 100644 --- a/pkg/handler/composer.go +++ b/pkg/handler/composer.go @@ -14,8 +14,10 @@ type StoreComposer struct { Concater ConcaterDataStore UsesLengthDeferrer bool LengthDeferrer LengthDeferrerDataStore - ContentServer ContentServerDataStore - UsesContentServer bool + ContentServer ContentServerDataStore + UsesContentServer bool + UsesIdempotencyKeyStore bool + IdempotencyKeyStore IdempotencyKeyStore } // NewStoreComposer creates a new and empty store composer. @@ -58,6 +60,12 @@ func (store *StoreComposer) Capabilities() string { } else { str += "✗" } + str += ` IdempotencyKeyStore: ` + if store.UsesIdempotencyKeyStore { + str += "✓" + } else { + str += "✗" + } return str } @@ -92,3 +100,8 @@ func (store *StoreComposer) UseContentServer(ext ContentServerDataStore) { store.UsesContentServer = ext != nil store.ContentServer = ext } + +func (store *StoreComposer) UseIdempotencyKeyStore(ext IdempotencyKeyStore) { + store.UsesIdempotencyKeyStore = ext != nil + store.IdempotencyKeyStore = ext +} diff --git a/pkg/handler/config.go b/pkg/handler/config.go index 4efa43598..2b6e63263 100644 --- a/pkg/handler/config.go +++ b/pkg/handler/config.go @@ -145,7 +145,7 @@ var DefaultCorsConfig = CorsConfig{ AllowOrigin: regexp.MustCompile(".*"), AllowCredentials: false, AllowMethods: "POST, HEAD, PATCH, OPTIONS, GET, DELETE", - AllowHeaders: "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version", + AllowHeaders: "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version, Idempotency-Key", MaxAge: "86400", ExposeHeaders: "Upload-Offset, Location, Upload-Length, Tus-Version, Tus-Resumable, Tus-Max-Size, Tus-Extension, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version", } diff --git a/pkg/handler/cors_test.go b/pkg/handler/cors_test.go index 3cda591fd..65a6d509c 100644 --- a/pkg/handler/cors_test.go +++ b/pkg/handler/cors_test.go @@ -23,7 +23,7 @@ func TestCORS(t *testing.T) { }, Code: http.StatusOK, ResHeader: map[string]string{ - "Access-Control-Allow-Headers": "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version", + "Access-Control-Allow-Headers": "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version, Idempotency-Key", "Access-Control-Allow-Methods": "POST, HEAD, PATCH, OPTIONS, GET, DELETE", "Access-Control-Max-Age": "86400", "Access-Control-Allow-Origin": "https://tus.io", @@ -72,7 +72,7 @@ func TestCORS(t *testing.T) { }, Code: http.StatusOK, ResHeader: map[string]string{ - "Access-Control-Allow-Headers": "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version", + "Access-Control-Allow-Headers": "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version, Idempotency-Key", "Access-Control-Allow-Methods": "POST, HEAD, PATCH, OPTIONS, GET, DELETE", "Access-Control-Max-Age": "86400", "Access-Control-Allow-Origin": "http://tus.io", diff --git a/pkg/handler/datastore.go b/pkg/handler/datastore.go index 5243834fd..23495e94d 100644 --- a/pkg/handler/datastore.go +++ b/pkg/handler/datastore.go @@ -193,6 +193,23 @@ type Lock interface { Unlock() error } +// IdempotencyKeyStore is the interface for persisting mappings from client-provided +// Idempotency-Key header values to upload IDs. This allows the handler to detect +// retried upload creation requests and return the existing upload instead of +// creating a duplicate. Implementations may store mappings in memory, on disk, +// or in an external service. +// +// See https://www.ietf.org/archive/id/draft-ietf-httpapi-idempotency-key-header-07.html +type IdempotencyKeyStore interface { + // FindUploadID returns the upload ID previously associated with the given + // idempotency key. If no mapping exists, ErrNotFound must be returned. + FindUploadID(ctx context.Context, key string) (string, error) + + // StoreUploadID persists a mapping from the given idempotency key to the + // given upload ID. + StoreUploadID(ctx context.Context, key string, uploadID string) error +} + type ServableUpload interface { // ServeContent serves the uploaded data as specified by the GET request. // It allows data stores to delegate the handling of range requests and conditional diff --git a/pkg/handler/idempotency_test.go b/pkg/handler/idempotency_test.go new file mode 100644 index 000000000..a717a4239 --- /dev/null +++ b/pkg/handler/idempotency_test.go @@ -0,0 +1,428 @@ +package handler_test + +import ( + "net/http" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + . "github.com/tus/tusd/v2/pkg/handler" + "github.com/tus/tusd/v2/pkg/memoryidempotencystore" +) + +func TestIdempotency(t *testing.T) { + SubTest(t, "ConcatRetryComplete", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + a := assert.New(t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + uploadA := NewMockFullUpload(ctrl) + uploadB := NewMockFullUpload(ctrl) + uploadC := NewMockFullUpload(ctrl) + + idempotencyStore := memoryidempotencystore.New() + idempotencyStore.UseIn(composer) + + gomock.InOrder( + store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil), + uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().GetUpload(gomock.Any(), "b").Return(uploadB, nil), + uploadB.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ + Size: 10, + IsPartial: false, + IsFinal: true, + PartialUploads: []string{"a", "b"}, + MetaData: make(map[string]string), + }).Return(uploadC, nil), + uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: "concat-upload-1", + Size: 10, + IsPartial: false, + IsFinal: true, + PartialUploads: []string{"a", "b"}, + MetaData: make(map[string]string), + }, nil), + store.EXPECT().AsConcatableUpload(uploadC).Return(uploadC), + uploadC.EXPECT().ConcatUploads(gomock.Any(), []Upload{uploadA, uploadB}).Return(nil), + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + NotifyCompleteUploads: true, + }) + + c := make(chan HookEvent, 1) + handler.CompleteUploads = c + + // First request: creates the concat upload and stores the idempotency key. + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Concat": "final;http://tus.io/files/a /files/b", + "Idempotency-Key": "concat-key-1", + }, + Code: http.StatusCreated, + }).Run(handler, t) + + event := <-c + a.Equal("concat-upload-1", event.Upload.ID) + + // Second request: same idempotency key, upload already completed. + // Should return existing upload without calling NewUpload or ConcatUploads again. + uploadCRetry := NewMockFullUpload(ctrl) + + gomock.InOrder( + store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil), + uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().GetUpload(gomock.Any(), "b").Return(uploadB, nil), + uploadB.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().GetUpload(gomock.Any(), "concat-upload-1").Return(uploadCRetry, nil), + uploadCRetry.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: "concat-upload-1", + Size: 10, + Offset: 10, + IsFinal: true, + PartialUploads: []string{"a", "b"}, + }, nil), + ) + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Concat": "final;http://tus.io/files/a /files/b", + "Idempotency-Key": "concat-key-1", + }, + Code: http.StatusCreated, + ResHeader: map[string]string{ + "Location": "http://tus.io/files/concat-upload-1", + }, + }).Run(handler, t) + }) + + SubTest(t, "ConcatRetryIncomplete", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + a := assert.New(t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + uploadA := NewMockFullUpload(ctrl) + uploadB := NewMockFullUpload(ctrl) + uploadC := NewMockFullUpload(ctrl) + + idempotencyStore := memoryidempotencystore.New() + idempotencyStore.UseIn(composer) + + // Pre-seed the idempotency store with a mapping to simulate a previous + // attempt that created the upload but didn't complete concat. + idempotencyStore.StoreUploadID(nil, "concat-key-retry", "concat-upload-2") + + gomock.InOrder( + store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil), + uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().GetUpload(gomock.Any(), "b").Return(uploadB, nil), + uploadB.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + // Idempotency lookup finds the existing upload with offset 0 + store.EXPECT().GetUpload(gomock.Any(), "concat-upload-2").Return(uploadC, nil), + uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: "concat-upload-2", + Size: 10, + Offset: 0, + IsFinal: true, + PartialUploads: []string{"a", "b"}, + }, nil), + // Should retry the concatenation on the existing upload + store.EXPECT().AsConcatableUpload(uploadC).Return(uploadC), + uploadC.EXPECT().ConcatUploads(gomock.Any(), []Upload{uploadA, uploadB}).Return(nil), + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + NotifyCompleteUploads: true, + }) + + c := make(chan HookEvent, 1) + handler.CompleteUploads = c + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Concat": "final;http://tus.io/files/a /files/b", + "Idempotency-Key": "concat-key-retry", + }, + Code: http.StatusCreated, + ResHeader: map[string]string{ + "Location": "http://tus.io/files/concat-upload-2", + }, + }).Run(handler, t) + + event := <-c + a.Equal("concat-upload-2", event.Upload.ID) + }) + + SubTest(t, "ConcatRetryCorrupted", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + uploadA := NewMockFullUpload(ctrl) + uploadB := NewMockFullUpload(ctrl) + uploadC := NewMockFullUpload(ctrl) + + idempotencyStore := memoryidempotencystore.New() + idempotencyStore.UseIn(composer) + + idempotencyStore.StoreUploadID(nil, "concat-key-corrupted", "concat-upload-3") + + gomock.InOrder( + store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil), + uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().GetUpload(gomock.Any(), "b").Return(uploadB, nil), + uploadB.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().GetUpload(gomock.Any(), "concat-upload-3").Return(uploadC, nil), + uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: "concat-upload-3", + Size: 10, + Offset: 3, + IsFinal: true, + PartialUploads: []string{"a", "b"}, + }, nil), + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + }) + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Concat": "final;http://tus.io/files/a /files/b", + "Idempotency-Key": "concat-key-corrupted", + }, + Code: http.StatusInternalServerError, + ResBody: "ERR_CONCAT_CORRUPTED: previous concatenation attempt was partially completed and left the upload in an inconsistent state\n", + }).Run(handler, t) + }) + + SubTest(t, "RegularUploadRetry", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + upload := NewMockFullUpload(ctrl) + uploadRetry := NewMockFullUpload(ctrl) + + idempotencyStore := memoryidempotencystore.New() + idempotencyStore.UseIn(composer) + + // First request: creates new upload + gomock.InOrder( + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ + Size: 100, + MetaData: make(map[string]string), + }).Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: "upload-abc", + Size: 100, + MetaData: make(map[string]string), + }, nil), + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + }) + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Length": "100", + "Idempotency-Key": "upload-key-1", + }, + Code: http.StatusCreated, + ResHeader: map[string]string{ + "Location": "http://tus.io/files/upload-abc", + }, + }).Run(handler, t) + + // Second request: same key, returns existing upload + gomock.InOrder( + store.EXPECT().GetUpload(gomock.Any(), "upload-abc").Return(uploadRetry, nil), + uploadRetry.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: "upload-abc", + Size: 100, + Offset: 50, + }, nil), + ) + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Length": "100", + "Idempotency-Key": "upload-key-1", + }, + Code: http.StatusCreated, + ResHeader: map[string]string{ + "Location": "http://tus.io/files/upload-abc", + "Upload-Offset": "50", + }, + }).Run(handler, t) + }) + + SubTest(t, "NoStoreConfigured", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + upload := NewMockFullUpload(ctrl) + + // No idempotency store on composer -- header should be ignored. + gomock.InOrder( + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ + Size: 100, + MetaData: make(map[string]string), + }).Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: "upload-xyz", + Size: 100, + MetaData: make(map[string]string), + }, nil), + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + }) + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Length": "100", + "Idempotency-Key": "some-key", + }, + Code: http.StatusCreated, + ResHeader: map[string]string{ + "Location": "http://tus.io/files/upload-xyz", + }, + }).Run(handler, t) + }) + + SubTest(t, "NoHeaderSent", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + upload := NewMockFullUpload(ctrl) + + idempotencyStore := memoryidempotencystore.New() + idempotencyStore.UseIn(composer) + + // Store is configured but no Idempotency-Key header -- normal flow. + gomock.InOrder( + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ + Size: 100, + MetaData: make(map[string]string), + }).Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: "upload-no-key", + Size: 100, + MetaData: make(map[string]string), + }, nil), + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + }) + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Length": "100", + }, + Code: http.StatusCreated, + ResHeader: map[string]string{ + "Location": "http://tus.io/files/upload-no-key", + }, + }).Run(handler, t) + }) + + SubTest(t, "DeletedUploadFallsThrough", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + upload := NewMockFullUpload(ctrl) + + idempotencyStore := memoryidempotencystore.New() + idempotencyStore.UseIn(composer) + + // Pre-seed with a mapping to a since-deleted upload + idempotencyStore.StoreUploadID(nil, "stale-key", "deleted-upload") + + gomock.InOrder( + // Idempotency lookup finds the key but the upload is gone + store.EXPECT().GetUpload(gomock.Any(), "deleted-upload").Return(nil, ErrNotFound), + // Falls through to create a new upload + store.EXPECT().NewUpload(gomock.Any(), FileInfo{ + Size: 100, + MetaData: make(map[string]string), + }).Return(upload, nil), + upload.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: "new-upload-after-delete", + Size: 100, + MetaData: make(map[string]string), + }, nil), + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + }) + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Length": "100", + "Idempotency-Key": "stale-key", + }, + Code: http.StatusCreated, + ResHeader: map[string]string{ + "Location": "http://tus.io/files/new-upload-after-delete", + }, + }).Run(handler, t) + }) +} diff --git a/pkg/handler/unrouted_handler.go b/pkg/handler/unrouted_handler.go index 3053c7de6..f63e2693e 100644 --- a/pkg/handler/unrouted_handler.go +++ b/pkg/handler/unrouted_handler.go @@ -10,6 +10,7 @@ import ( "mime" "net/http" "regexp" + "slices" "strconv" "strings" "time" @@ -70,6 +71,7 @@ var ( // when the upload got interrupted. Most clients will not retry 4XX but only 5XX, so we responsd with 500 here. ErrReadTimeout = NewError("ERR_READ_TIMEOUT", "timeout while reading request body", http.StatusInternalServerError) ErrConnectionReset = NewError("ERR_CONNECTION_RESET", "TCP connection reset by peer", http.StatusInternalServerError) + ErrConcatCorrupted = NewError("ERR_CONCAT_CORRUPTED", "previous concatenation attempt was partially completed and left the upload in an inconsistent state", http.StatusInternalServerError) ) // UnroutedHandler exposes methods to handle requests as part of the tus protocol, @@ -390,16 +392,94 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) } } - upload, err := handler.composer.Core.NewUpload(c, info) - if err != nil { - handler.sendError(c, err) - return + // If an idempotency key store is configured, check for retried requests. + idempotencyKey := r.Header.Get("Idempotency-Key") + var upload Upload + existingFound := false + + if idempotencyKey != "" && handler.composer.UsesIdempotencyKeyStore { + existingUploadID, findErr := handler.composer.IdempotencyKeyStore.FindUploadID(c, idempotencyKey) + if findErr == nil { + existingUpload, getErr := handler.composer.Core.GetUpload(c, existingUploadID) + if getErr == nil { + existingInfo, infoErr := existingUpload.GetInfo(c) + if infoErr != nil { + handler.sendError(c, infoErr) + return + } + + if isFinal && existingInfo.IsFinal && slices.Equal(existingInfo.PartialUploads, partialUploadIDs) { + if existingInfo.Offset == existingInfo.Size { + // Concat already completed successfully. Return existing upload. + url := handler.absFileURL(r, existingInfo.ID) + resp.Header["Location"] = url + c.log = c.log.With("id", existingInfo.ID) + c.log.InfoContext(c, "UploadIdempotentReplay", "size", existingInfo.Size, "url", url) + + // Only run PreFinishResponseCallback for response headers. + // Do NOT re-emit finish events (metrics, CompleteUploads hook) + // since this upload already completed previously. + if handler.config.PreFinishResponseCallback != nil { + resp2, err := handler.config.PreFinishResponseCallback(newHookEvent(c, existingInfo)) + if err != nil { + handler.sendError(c, err) + return + } + resp = resp.MergeWith(resp2) + } + + handler.sendResp(c, resp) + return + } else if existingInfo.Offset == 0 { + // Upload was created but concat never completed. Retry it. + upload = existingUpload + info = existingInfo + existingFound = true + } else { + handler.sendError(c, ErrConcatCorrupted) + return + } + } else if !isFinal { + // Non-concat upload already exists. Return it for the client to resume via PATCH. + url := handler.absFileURL(r, existingInfo.ID) + resp.Header["Location"] = url + resp.Header["Upload-Offset"] = strconv.FormatInt(existingInfo.Offset, 10) + c.log = c.log.With("id", existingInfo.ID) + c.log.InfoContext(c, "UploadIdempotentReplay", "size", existingInfo.Size, "url", url) + + handler.sendResp(c, resp) + return + } + // If flags don't match (e.g. isFinal differs), fall through to new creation. + } else if !errors.Is(getErr, ErrNotFound) { + handler.sendError(c, getErr) + return + } + // Upload ID was stored but upload was since deleted/terminated. Fall through to create. + } else if !errors.Is(findErr, ErrNotFound) { + handler.sendError(c, findErr) + return + } } - info, err = upload.GetInfo(c) - if err != nil { - handler.sendError(c, err) - return + if !existingFound { + upload, err = handler.composer.Core.NewUpload(c, info) + if err != nil { + handler.sendError(c, err) + return + } + + info, err = upload.GetInfo(c) + if err != nil { + handler.sendError(c, err) + return + } + + if idempotencyKey != "" && handler.composer.UsesIdempotencyKeyStore { + if storeErr := handler.composer.IdempotencyKeyStore.StoreUploadID(c, idempotencyKey, info.ID); storeErr != nil { + c.log.WarnContext(c, "FailedToStoreIdempotencyKey", "error", storeErr) + } + } } id := info.ID @@ -409,12 +489,17 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) url := handler.absFileURL(r, id) resp.Header["Location"] = url - handler.Metrics.incUploadsCreated() - c.log = c.log.With("id", id) - c.log.InfoContext(c, "UploadCreated", "size", size, "url", url) + if !existingFound { + handler.Metrics.incUploadsCreated() + c.log = c.log.With("id", id) + c.log.InfoContext(c, "UploadCreated", "size", size, "url", url) - if handler.config.NotifyCreatedUploads { - handler.CreatedUploads <- newHookEvent(c, info) + if handler.config.NotifyCreatedUploads { + handler.CreatedUploads <- newHookEvent(c, info) + } + } else { + c.log = c.log.With("id", id) + c.log.InfoContext(c, "UploadIdempotentRetry", "size", size, "url", url) } if isFinal { diff --git a/pkg/memoryidempotencystore/memoryidempotencystore.go b/pkg/memoryidempotencystore/memoryidempotencystore.go new file mode 100644 index 000000000..36acc406e --- /dev/null +++ b/pkg/memoryidempotencystore/memoryidempotencystore.go @@ -0,0 +1,49 @@ +// Package memoryidempotencystore provides an in-memory IdempotencyKeyStore. +// +// It persists idempotency key to upload ID mappings in memory. Mappings will +// be lost when the process exits. This is suitable for cloud storage backends +// (S3, GCS, Azure) where no local disk is available for persistent storage. +package memoryidempotencystore + +import ( + "context" + "sync" + + "github.com/tus/tusd/v2/pkg/handler" +) + +type MemoryIdempotencyStore struct { + entries map[string]string + mutex sync.RWMutex +} + +// New creates a new in-memory idempotency key store. +func New() *MemoryIdempotencyStore { + return &MemoryIdempotencyStore{ + entries: make(map[string]string), + } +} + +// UseIn adds this store to the passed composer. +func (s *MemoryIdempotencyStore) UseIn(composer *handler.StoreComposer) { + composer.UseIdempotencyKeyStore(s) +} + +func (s *MemoryIdempotencyStore) FindUploadID(ctx context.Context, key string) (string, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + uploadID, ok := s.entries[key] + if !ok { + return "", handler.ErrNotFound + } + return uploadID, nil +} + +func (s *MemoryIdempotencyStore) StoreUploadID(ctx context.Context, key string, uploadID string) error { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.entries[key] = uploadID + return nil +}