Skip to content
24 changes: 24 additions & 0 deletions cmd/tusd/cli/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"github.com/tus/tusd/v2/pkg/azurestore"
"github.com/tus/tusd/v2/pkg/filelocker"
"github.com/tus/tusd/v2/pkg/filestore"
"github.com/tus/tusd/v2/pkg/fileidempotencystore"
"github.com/tus/tusd/v2/pkg/gcsstore"
"github.com/tus/tusd/v2/pkg/handler"
"github.com/tus/tusd/v2/pkg/memoryidempotencystore"
"github.com/tus/tusd/v2/pkg/memorylocker"
"github.com/tus/tusd/v2/pkg/s3store"

Expand Down Expand Up @@ -83,6 +85,11 @@ func CreateComposer() {

// Attach the metrics from S3 store to the global Prometheus registry
store.RegisterMetrics(prometheus.DefaultRegisterer)

if !Flags.DisableIdempotency {
idempotencyStore := memoryidempotencystore.New()
idempotencyStore.UseIn(Composer)
}
} else if Flags.GCSBucket != "" {
if Flags.GCSObjectPrefix != "" && strings.Contains(Flags.GCSObjectPrefix, "_") {
stderr.Fatalf("gcs-object-prefix value (%s) can't contain underscore. "+
Expand Down Expand Up @@ -113,6 +120,11 @@ func CreateComposer() {

locker := memorylocker.New()
locker.UseIn(Composer)

if !Flags.DisableIdempotency {
idempotencyStore := memoryidempotencystore.New()
idempotencyStore.UseIn(Composer)
}
} else if Flags.AzStorage != "" {

accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
Expand Down Expand Up @@ -156,6 +168,11 @@ func CreateComposer() {

locker := memorylocker.New()
locker.UseIn(Composer)

if !Flags.DisableIdempotency {
idempotencyStore := memoryidempotencystore.New()
idempotencyStore.UseIn(Composer)
}
} else {
dir, err := filepath.Abs(Flags.UploadDir)
if err != nil {
Expand All @@ -177,6 +194,13 @@ func CreateComposer() {
locker.AcquirerPollInterval = Flags.FilelockAcquirerPollInterval
locker.HolderPollInterval = Flags.FilelockHolderPollInterval
locker.UseIn(Composer)

if !Flags.DisableIdempotency {
idempotencyStore := fileidempotencystore.New(dir)
idempotencyStore.DirModePerm = os.FileMode(Flags.DirPerms) & os.ModePerm
idempotencyStore.FileModePerm = os.FileMode(Flags.FilePerms) & os.ModePerm
idempotencyStore.UseIn(Composer)
}
}

printStartupLog("Using %.2fMB as maximum size.\n", float64(Flags.MaxSize)/1024/1024)
Expand Down
2 changes: 2 additions & 0 deletions cmd/tusd/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var Flags struct {
DirPerms uint32
GracefulRequestCompletionTimeout time.Duration
ExperimentalProtocol bool
DisableIdempotency bool
}

type ChmodPermsValue struct {
Expand Down Expand Up @@ -142,6 +143,7 @@ func ParseFlags() {
f.BoolVar(&Flags.DisableDownload, "disable-download", false, "Disable the download endpoint")
f.BoolVar(&Flags.DisableTermination, "disable-termination", false, "Disable the termination endpoint")
f.BoolVar(&Flags.DisableConcatenation, "disable-concatenation", false, "Disable support for the concatenation extension")
f.BoolVar(&Flags.DisableIdempotency, "disable-idempotency", false, "Disable support for the Idempotency-Key header to detect retried upload creation requests")
f.Int64Var(&Flags.MaxSize, "max-size", 0, "Maximum size of a single upload in bytes")
})

Expand Down
124 changes: 124 additions & 0 deletions pkg/fileidempotencystore/fileidempotencystore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Package fileidempotencystore provides a disk-backed IdempotencyKeyStore.
//
// It persists idempotency key to upload ID mappings as small JSON files in a
// configurable directory (typically the same directory used for upload data).
// Each mapping is stored in a file named {sha256(key)}.idempotency-key. The
// SHA-256 hash ensures filenames are safe for any filesystem, and the full
// original key is stored inside the file to guard against hash collisions.
package fileidempotencystore

import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"io/fs"
"os"
"path/filepath"

"github.com/tus/tusd/v2/pkg/handler"
)

type FileIdempotencyStore struct {
// Path is the directory in which .idempotency-key files are stored.
Path string

// DirModePerm is the permission bits used when creating directories.
// If zero, defaults to 0775.
DirModePerm fs.FileMode

// FileModePerm is the permission bits used when creating files.
// If zero, defaults to 0664.
FileModePerm fs.FileMode
}

// New creates a new file-based idempotency key store. The directory specified
// will be used to read and write .idempotency-key files. This method does not
// check whether the path exists; use os.MkdirAll to ensure it does.
func New(path string) *FileIdempotencyStore {
return &FileIdempotencyStore{Path: path}
}

// UseIn adds this store to the passed composer.
func (s *FileIdempotencyStore) UseIn(composer *handler.StoreComposer) {
composer.UseIdempotencyKeyStore(s)
}

type keyMapping struct {
Key string `json:"key"`
UploadID string `json:"upload_id"`
}

func (s *FileIdempotencyStore) FindUploadID(ctx context.Context, key string) (string, error) {
path := s.filePath(key)

data, err := os.ReadFile(path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return "", handler.ErrNotFound
}
return "", err
}

var mapping keyMapping
if err := json.Unmarshal(data, &mapping); err != nil {
// File is corrupted (e.g. from a crash during write). Treat as
// missing so the handler falls through to create a new upload,
// which will overwrite this file via StoreUploadID.
return "", handler.ErrNotFound
}

if mapping.Key != key {
// Hash collision: the stored key doesn't match the requested key.
return "", handler.ErrNotFound
}

return mapping.UploadID, nil
}

func (s *FileIdempotencyStore) StoreUploadID(ctx context.Context, key string, uploadID string) error {
mapping := keyMapping{
Key: key,
UploadID: uploadID,
}

data, err := json.Marshal(mapping)
if err != nil {
return err
}

path := s.filePath(key)
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, s.dirPerm()); err != nil {
return err
}

// Write to a temp file first, then rename atomically to prevent
// corrupted mapping files if the process crashes mid-write.
tmp := path + ".tmp"
if err := os.WriteFile(tmp, data, s.filePerm()); err != nil {
return err
}
return os.Rename(tmp, path)
}

func (s *FileIdempotencyStore) filePath(key string) string {
hash := sha256.Sum256([]byte(key))
name := hex.EncodeToString(hash[:]) + ".idempotency-key"
return filepath.Join(s.Path, name)
}

func (s *FileIdempotencyStore) dirPerm() fs.FileMode {
if s.DirModePerm == 0 {
return 0775
}
return s.DirModePerm
}

func (s *FileIdempotencyStore) filePerm() fs.FileMode {
if s.FileModePerm == 0 {
return 0664
}
return s.FileModePerm
}
75 changes: 75 additions & 0 deletions pkg/fileidempotencystore/fileidempotencystore_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package fileidempotencystore

import (
"context"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/tus/tusd/v2/pkg/handler"
)

func TestFileIdempotencyStore(t *testing.T) {
dir, err := os.MkdirTemp("", "fileidempotencystore-test-*")
assert.NoError(t, err)
defer os.RemoveAll(dir)

store := New(dir)

ctx := context.Background()

t.Run("FindReturnsErrNotFoundForMissingKey", func(t *testing.T) {
_, err := store.FindUploadID(ctx, "nonexistent-key")
assert.ErrorIs(t, err, handler.ErrNotFound)
})

t.Run("StoreAndFind", func(t *testing.T) {
err := store.StoreUploadID(ctx, "my-idempotency-key", "upload-123")
assert.NoError(t, err)

uploadID, err := store.FindUploadID(ctx, "my-idempotency-key")
assert.NoError(t, err)
assert.Equal(t, "upload-123", uploadID)
})

t.Run("DifferentKeysAreSeparate", func(t *testing.T) {
err := store.StoreUploadID(ctx, "key-a", "upload-a")
assert.NoError(t, err)

err = store.StoreUploadID(ctx, "key-b", "upload-b")
assert.NoError(t, err)

id, err := store.FindUploadID(ctx, "key-a")
assert.NoError(t, err)
assert.Equal(t, "upload-a", id)

id, err = store.FindUploadID(ctx, "key-b")
assert.NoError(t, err)
assert.Equal(t, "upload-b", id)
})

t.Run("OverwritesExistingMapping", func(t *testing.T) {
err := store.StoreUploadID(ctx, "overwrite-key", "first-id")
assert.NoError(t, err)

err = store.StoreUploadID(ctx, "overwrite-key", "second-id")
assert.NoError(t, err)

id, err := store.FindUploadID(ctx, "overwrite-key")
assert.NoError(t, err)
assert.Equal(t, "second-id", id)
})

t.Run("CorruptedFileReturnErrNotFound", func(t *testing.T) {
// Write garbage to simulate a crash during write.
err := store.StoreUploadID(ctx, "corrupt-key", "good-id")
assert.NoError(t, err)

// Overwrite the file with invalid JSON.
path := store.filePath("corrupt-key")
assert.NoError(t, os.WriteFile(path, []byte("not json"), 0664))

_, err = store.FindUploadID(ctx, "corrupt-key")
assert.ErrorIs(t, err, handler.ErrNotFound)
})
}
17 changes: 15 additions & 2 deletions pkg/handler/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ type StoreComposer struct {
Concater ConcaterDataStore
UsesLengthDeferrer bool
LengthDeferrer LengthDeferrerDataStore
ContentServer ContentServerDataStore
UsesContentServer bool
ContentServer ContentServerDataStore
UsesContentServer bool
UsesIdempotencyKeyStore bool
IdempotencyKeyStore IdempotencyKeyStore
}

// NewStoreComposer creates a new and empty store composer.
Expand Down Expand Up @@ -58,6 +60,12 @@ func (store *StoreComposer) Capabilities() string {
} else {
str += "✗"
}
str += ` IdempotencyKeyStore: `
if store.UsesIdempotencyKeyStore {
str += "✓"
} else {
str += "✗"
}

return str
}
Expand Down Expand Up @@ -92,3 +100,8 @@ func (store *StoreComposer) UseContentServer(ext ContentServerDataStore) {
store.UsesContentServer = ext != nil
store.ContentServer = ext
}

func (store *StoreComposer) UseIdempotencyKeyStore(ext IdempotencyKeyStore) {
store.UsesIdempotencyKeyStore = ext != nil
store.IdempotencyKeyStore = ext
}
2 changes: 1 addition & 1 deletion pkg/handler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ var DefaultCorsConfig = CorsConfig{
AllowOrigin: regexp.MustCompile(".*"),
AllowCredentials: false,
AllowMethods: "POST, HEAD, PATCH, OPTIONS, GET, DELETE",
AllowHeaders: "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version",
AllowHeaders: "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version, Idempotency-Key",
MaxAge: "86400",
ExposeHeaders: "Upload-Offset, Location, Upload-Length, Tus-Version, Tus-Resumable, Tus-Max-Size, Tus-Extension, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version",
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/handler/cors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestCORS(t *testing.T) {
},
Code: http.StatusOK,
ResHeader: map[string]string{
"Access-Control-Allow-Headers": "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version",
"Access-Control-Allow-Headers": "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version, Idempotency-Key",
"Access-Control-Allow-Methods": "POST, HEAD, PATCH, OPTIONS, GET, DELETE",
"Access-Control-Max-Age": "86400",
"Access-Control-Allow-Origin": "https://tus.io",
Expand Down Expand Up @@ -72,7 +72,7 @@ func TestCORS(t *testing.T) {
},
Code: http.StatusOK,
ResHeader: map[string]string{
"Access-Control-Allow-Headers": "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version",
"Access-Control-Allow-Headers": "Authorization, Origin, X-Requested-With, X-Request-ID, X-HTTP-Method-Override, Content-Type, Upload-Length, Upload-Offset, Tus-Resumable, Upload-Metadata, Upload-Defer-Length, Upload-Concat, Upload-Incomplete, Upload-Complete, Upload-Draft-Interop-Version, Idempotency-Key",
"Access-Control-Allow-Methods": "POST, HEAD, PATCH, OPTIONS, GET, DELETE",
"Access-Control-Max-Age": "86400",
"Access-Control-Allow-Origin": "http://tus.io",
Expand Down
17 changes: 17 additions & 0 deletions pkg/handler/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,23 @@ type Lock interface {
Unlock() error
}

// IdempotencyKeyStore is the interface for persisting mappings from client-provided
// Idempotency-Key header values to upload IDs. This allows the handler to detect
// retried upload creation requests and return the existing upload instead of
// creating a duplicate. Implementations may store mappings in memory, on disk,
// or in an external service.
//
// See https://www.ietf.org/archive/id/draft-ietf-httpapi-idempotency-key-header-07.html
type IdempotencyKeyStore interface {
// FindUploadID returns the upload ID previously associated with the given
// idempotency key. If no mapping exists, ErrNotFound must be returned.
FindUploadID(ctx context.Context, key string) (string, error)

// StoreUploadID persists a mapping from the given idempotency key to the
// given upload ID.
StoreUploadID(ctx context.Context, key string, uploadID string) error
}

type ServableUpload interface {
// ServeContent serves the uploaded data as specified by the GET request.
// It allows data stores to delegate the handling of range requests and conditional
Expand Down
Loading