diff --git a/pkg/handler/concat_test.go b/pkg/handler/concat_test.go index 9846d9cec..108dad66e 100644 --- a/pkg/handler/concat_test.go +++ b/pkg/handler/concat_test.go @@ -112,6 +112,8 @@ func TestConcat(t *testing.T) { uploadB := NewMockFullUpload(ctrl) uploadC := NewMockFullUpload(ctrl) + concatID := "concat-7e18f737311b2dc3b2f269dd78396b03" + gomock.InOrder( store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil), uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ @@ -125,7 +127,10 @@ func TestConcat(t *testing.T) { Size: 5, Offset: 5, }, nil), + // Idempotency check: look up the deterministic concat ID + store.EXPECT().GetUpload(gomock.Any(), concatID).Return(nil, ErrNotFound), store.EXPECT().NewUpload(gomock.Any(), FileInfo{ + ID: concatID, Size: 10, IsPartial: false, IsFinal: true, @@ -133,7 +138,7 @@ func TestConcat(t *testing.T) { MetaData: make(map[string]string), }).Return(uploadC, nil), uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ - ID: "foo", + ID: concatID, Size: 10, IsPartial: false, IsFinal: true, @@ -149,7 +154,7 @@ func TestConcat(t *testing.T) { StoreComposer: composer, NotifyCompleteUploads: true, PreFinishResponseCallback: func(hook HookEvent) (HTTPResponse, error) { - a.Equal("foo", hook.Upload.ID) + a.Equal(concatID, hook.Upload.ID) return HTTPResponse{ Header: HTTPHeader{ "X-Custom-Resp-Header": "hello", @@ -179,7 +184,7 @@ func TestConcat(t *testing.T) { event := <-c info := event.Upload - a.Equal("foo", info.ID) + a.Equal(concatID, info.ID) a.EqualValues(10, info.Size) a.EqualValues(10, info.Offset) a.False(info.IsPartial) @@ -338,6 +343,184 @@ func TestConcat(t *testing.T) { }).Run(handler, t) }) + // Test idempotent retry when concat already completed successfully. + SubTest(t, "IdempotentRetryComplete", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + a := assert.New(t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + uploadA := NewMockFullUpload(ctrl) + uploadB := NewMockFullUpload(ctrl) + uploadC := NewMockFullUpload(ctrl) + + concatID := "concat-7e18f737311b2dc3b2f269dd78396b03" + + gomock.InOrder( + store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil), + uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().GetUpload(gomock.Any(), "b").Return(uploadB, nil), + uploadB.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + // Idempotency check: upload already exists and is complete + store.EXPECT().GetUpload(gomock.Any(), concatID).Return(uploadC, nil), + uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: concatID, + Size: 10, + Offset: 10, + IsFinal: true, + PartialUploads: []string{"a", "b"}, + }, nil), + // No NewUpload or ConcatUploads should be called + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + NotifyCompleteUploads: true, + }) + + c := make(chan HookEvent, 1) + handler.CompleteUploads = c + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Concat": "final;http://tus.io/files/a /files/b", + }, + Code: http.StatusCreated, + }).Run(handler, t) + + event := <-c + info := event.Upload + a.Equal(concatID, info.ID) + a.EqualValues(10, info.Size) + a.EqualValues(10, info.Offset) + a.True(info.IsFinal) + }) + + // Test idempotent retry when upload was created but concat didn't complete. + SubTest(t, "IdempotentRetryIncomplete", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + a := assert.New(t) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + uploadA := NewMockFullUpload(ctrl) + uploadB := NewMockFullUpload(ctrl) + uploadC := NewMockFullUpload(ctrl) + + concatID := "concat-7e18f737311b2dc3b2f269dd78396b03" + + gomock.InOrder( + store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil), + uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().GetUpload(gomock.Any(), "b").Return(uploadB, nil), + uploadB.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + // Idempotency check: upload exists but concat hasn't happened yet + store.EXPECT().GetUpload(gomock.Any(), concatID).Return(uploadC, nil), + uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: concatID, + Size: 10, + Offset: 0, + IsFinal: true, + PartialUploads: []string{"a", "b"}, + }, nil), + // Should retry the concatenation + store.EXPECT().AsConcatableUpload(uploadC).Return(uploadC), + uploadC.EXPECT().ConcatUploads(gomock.Any(), []Upload{uploadA, uploadB}).Return(nil), + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + NotifyCompleteUploads: true, + }) + + c := make(chan HookEvent, 1) + handler.CompleteUploads = c + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Concat": "final;http://tus.io/files/a /files/b", + }, + Code: http.StatusCreated, + }).Run(handler, t) + + event := <-c + info := event.Upload + a.Equal(concatID, info.ID) + a.EqualValues(10, info.Size) + a.EqualValues(10, info.Offset) + a.True(info.IsFinal) + }) + + // Test that a partially corrupted concat (0 < offset < size) returns an error. + SubTest(t, "IdempotentRetryCorrupted", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + uploadA := NewMockFullUpload(ctrl) + uploadB := NewMockFullUpload(ctrl) + uploadC := NewMockFullUpload(ctrl) + + concatID := "concat-7e18f737311b2dc3b2f269dd78396b03" + + gomock.InOrder( + store.EXPECT().GetUpload(gomock.Any(), "a").Return(uploadA, nil), + uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + store.EXPECT().GetUpload(gomock.Any(), "b").Return(uploadB, nil), + uploadB.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + IsPartial: true, + Size: 5, + Offset: 5, + }, nil), + // Idempotency check: upload exists with partial data (corrupted) + store.EXPECT().GetUpload(gomock.Any(), concatID).Return(uploadC, nil), + uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ + ID: concatID, + Size: 10, + Offset: 3, + IsFinal: true, + PartialUploads: []string{"a", "b"}, + }, nil), + ) + + handler, _ := NewHandler(Config{ + BasePath: "files", + StoreComposer: composer, + }) + + (&httpTest{ + Method: "POST", + ReqHeader: map[string]string{ + "Tus-Resumable": "1.0.0", + "Upload-Concat": "final;http://tus.io/files/a /files/b", + }, + Code: http.StatusInternalServerError, + ResBody: "ERR_CONCAT_CORRUPTED: previous concatenation attempt was partially completed and left the upload in an inconsistent state\n", + }).Run(handler, t) + }) + // Test that we can concatenate uploads, whose IDs contain slashes. SubTest(t, "UploadIDsWithSlashes", func(t *testing.T, store *MockFullDataStore, composer *StoreComposer) { ctrl := gomock.NewController(t) @@ -346,6 +529,8 @@ func TestConcat(t *testing.T) { uploadB := NewMockFullUpload(ctrl) uploadC := NewMockFullUpload(ctrl) + concatID := "concat-0f2c19317a7803781021b9b987dc84e7" + gomock.InOrder( store.EXPECT().GetUpload(gomock.Any(), "aaa/123").Return(uploadA, nil), uploadA.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ @@ -359,7 +544,10 @@ func TestConcat(t *testing.T) { Size: 5, Offset: 5, }, nil), + // Idempotency check: look up the deterministic concat ID + store.EXPECT().GetUpload(gomock.Any(), concatID).Return(nil, ErrNotFound), store.EXPECT().NewUpload(gomock.Any(), FileInfo{ + ID: concatID, Size: 10, IsPartial: false, IsFinal: true, @@ -367,7 +555,7 @@ func TestConcat(t *testing.T) { MetaData: make(map[string]string), }).Return(uploadC, nil), uploadC.EXPECT().GetInfo(gomock.Any()).Return(FileInfo{ - ID: "foo", + ID: concatID, Size: 10, IsPartial: false, IsFinal: true, diff --git a/pkg/handler/unrouted_handler.go b/pkg/handler/unrouted_handler.go index 3053c7de6..f608fdf4e 100644 --- a/pkg/handler/unrouted_handler.go +++ b/pkg/handler/unrouted_handler.go @@ -2,7 +2,9 @@ package handler import ( "context" + "crypto/sha256" "encoding/base64" + "encoding/hex" "errors" "fmt" "io" @@ -10,6 +12,7 @@ import ( "mime" "net/http" "regexp" + "slices" "strconv" "strings" "time" @@ -70,6 +73,7 @@ var ( // when the upload got interrupted. Most clients will not retry 4XX but only 5XX, so we responsd with 500 here. ErrReadTimeout = NewError("ERR_READ_TIMEOUT", "timeout while reading request body", http.StatusInternalServerError) ErrConnectionReset = NewError("ERR_CONNECTION_RESET", "TCP connection reset by peer", http.StatusInternalServerError) + ErrConcatCorrupted = NewError("ERR_CONCAT_CORRUPTED", "previous concatenation attempt was partially completed and left the upload in an inconsistent state", http.StatusInternalServerError) ) // UnroutedHandler exposes methods to handle requests as part of the tus protocol, @@ -390,16 +394,80 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) } } - upload, err := handler.composer.Core.NewUpload(c, info) - if err != nil { - handler.sendError(c, err) - return + // For final uploads (concatenation), derive a deterministic upload ID from + // the partial upload IDs. This makes the concat operation idempotent: if the + // request is interrupted (e.g. by a WAF timeout) and retried, the same final + // upload ID will be used, allowing us to detect and resume from the previous + // attempt rather than creating a duplicate. + if isFinal && info.ID == "" { + info.ID = concatDeterministicID(partialUploadIDs) } - info, err = upload.GetInfo(c) - if err != nil { - handler.sendError(c, err) - return + // For final uploads with a known ID, check if a previous concat attempt + // already created this upload. If so, we can skip creation and resume from + // wherever the previous attempt left off. + var upload Upload + existingFound := false + if isFinal { + existingUpload, getErr := handler.composer.Core.GetUpload(c, info.ID) + if getErr == nil { + existingInfo, infoErr := existingUpload.GetInfo(c) + if infoErr != nil { + handler.sendError(c, infoErr) + return + } + + // Verify this is actually the same concat operation (same partials in same order) + if existingInfo.IsFinal && slices.Equal(existingInfo.PartialUploads, partialUploadIDs) { + if existingInfo.Offset == existingInfo.Size { + // Concatenation already completed. Return the existing upload. + id := existingInfo.ID + url := handler.absFileURL(r, id) + resp.Header["Location"] = url + + c.log = c.log.With("id", id) + c.log.InfoContext(c, "UploadConcatAlreadyComplete", "size", existingInfo.Size, "url", url) + + resp, err = handler.emitFinishEvents(c, resp, existingInfo) + if err != nil { + handler.sendError(c, err) + return + } + + handler.sendResp(c, resp) + return + } else if existingInfo.Offset == 0 { + // Upload was created but concat never completed. Retry it. + upload = existingUpload + info = existingInfo + existingFound = true + } else { + // Partial concat (0 < offset < size): inconsistent state. + handler.sendError(c, ErrConcatCorrupted) + return + } + } + // If IsFinal is false or partials don't match, fall through to NewUpload + // which will fail with a storage-level conflict for the duplicate ID. + } else if !errors.Is(getErr, ErrNotFound) { + handler.sendError(c, getErr) + return + } + // ErrNotFound: no previous attempt exists, proceed to create. + } + + if !existingFound { + upload, err = handler.composer.Core.NewUpload(c, info) + if err != nil { + handler.sendError(c, err) + return + } + + info, err = upload.GetInfo(c) + if err != nil { + handler.sendError(c, err) + return + } } id := info.ID @@ -409,12 +477,17 @@ func (handler *UnroutedHandler) PostFile(w http.ResponseWriter, r *http.Request) url := handler.absFileURL(r, id) resp.Header["Location"] = url - handler.Metrics.incUploadsCreated() - c.log = c.log.With("id", id) - c.log.InfoContext(c, "UploadCreated", "size", size, "url", url) + if !existingFound { + handler.Metrics.incUploadsCreated() + c.log = c.log.With("id", id) + c.log.InfoContext(c, "UploadCreated", "size", size, "url", url) - if handler.config.NotifyCreatedUploads { - handler.CreatedUploads <- newHookEvent(c, info) + if handler.config.NotifyCreatedUploads { + handler.CreatedUploads <- newHookEvent(c, info) + } + } else { + c.log = c.log.With("id", id) + c.log.InfoContext(c, "UploadConcatRetrying", "size", size, "url", url) } if isFinal { @@ -1710,6 +1783,20 @@ func getRequestId(r *http.Request) string { return reqId } +// concatDeterministicID derives a deterministic upload ID from the ordered list of +// partial upload IDs. This ensures that retried concat requests for the same set of +// partials (in the same order) produce the same final upload ID, enabling idempotency. +func concatDeterministicID(partialUploadIDs []string) string { + h := sha256.New() + for i, id := range partialUploadIDs { + if i > 0 { + h.Write([]byte("\n")) + } + h.Write([]byte(id)) + } + return "concat-" + hex.EncodeToString(h.Sum(nil))[:32] +} + // validateUploadId checks whether an ID included in a FileInfoChanges struct is allowed. func validateUploadId(newId string) error { if newId == "" {