Skip to content

Commit 2a5992c

Browse files
committed
fix(publish): reload published inside task for source-management endpoints
Affected endpoints: apiPublishAddSource, apiPublishSetSources, apiPublishUpdateSource, apiPublishRemoveSource, apiPublishDropChanges. All five handlers shared the same flawed pattern: they loaded the published repo from the DB and mutated it (ObtainRevision / DropRevision) outside the task closure, before the task lock was acquired. Each task closure then just wrote back the already-mutated, pre-lock object. Because the task queue serialises tasks that share a resource key, two concurrent requests appear safe — but each task closure holds a stale copy of the object captured before the lock was taken: Request A loads published: revision = {} Request B loads published: revision = {} <- same DB state A mutates: revision = {main: snap1} B mutates: revision = {contrib: snap2} Task A runs: saves {main: snap1} OK Task B runs: saves {contrib: snap2} <- clobbers A's change Fix: perform only a shallow ByStoragePrefixDistribution outside the task (for the early 404 response, resource key, and task name). Inside the task closure a dedicated taskCollectionFactory is created, the published repo is re-read fresh from the DB (after the lock is acquired), and LoadComplete + all mutations + Update are executed against that authoritative copy.
1 parent 2827620 commit 2a5992c

1 file changed

Lines changed: 138 additions & 96 deletions

File tree

api/publish.go

Lines changed: 138 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -648,43 +648,52 @@ func apiPublishAddSource(c *gin.Context) {
648648
storage, prefix := deb.ParsePrefix(param)
649649
distribution := slashEscape(c.Params.ByName("distribution"))
650650

651+
if c.Bind(&b) != nil {
652+
return
653+
}
654+
651655
collectionFactory := context.NewCollectionFactory()
652656
collection := collectionFactory.PublishedRepoCollection()
653657

658+
// Load shallowly (no LoadComplete) to verify existence and obtain the
659+
// resource key and task name. The actual mutation is performed inside
660+
// the task on a freshly loaded copy to prevent lost-update races.
654661
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
655662
if err != nil {
656663
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to create: %s", err))
657664
return
658665
}
659666

660-
err = collection.LoadComplete(published, collectionFactory)
661-
if err != nil {
662-
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to create: %s", err))
663-
return
664-
}
667+
resources := []string{string(published.Key())}
668+
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
669+
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
670+
taskCollectionFactory := context.NewCollectionFactory()
671+
taskCollection := taskCollectionFactory.PublishedRepoCollection()
665672

666-
if c.Bind(&b) != nil {
667-
return
668-
}
673+
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
674+
if err != nil {
675+
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to create: %s", err)
676+
}
669677

670-
revision := published.ObtainRevision()
671-
sources := revision.Sources
678+
err = taskCollection.LoadComplete(published, taskCollectionFactory)
679+
if err != nil {
680+
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to create: %s", err)
681+
}
672682

673-
component := b.Component
674-
name := b.Name
683+
revision := published.ObtainRevision()
684+
sources := revision.Sources
675685

676-
_, exists := sources[component]
677-
if exists {
678-
AbortWithJSONError(c, http.StatusBadRequest, fmt.Errorf("unable to create: Component '%s' already exists", component))
679-
return
680-
}
686+
component := b.Component
687+
name := b.Name
681688

682-
sources[component] = name
689+
_, exists := sources[component]
690+
if exists {
691+
return &task.ProcessReturnValue{Code: http.StatusBadRequest, Value: nil}, fmt.Errorf("unable to create: Component '%s' already exists", component)
692+
}
683693

684-
resources := []string{string(published.Key())}
685-
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
686-
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
687-
err = collection.Update(published)
694+
sources[component] = name
695+
696+
err = taskCollection.Update(published)
688697
if err != nil {
689698
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
690699
}
@@ -766,39 +775,48 @@ func apiPublishSetSources(c *gin.Context) {
766775
storage, prefix := deb.ParsePrefix(param)
767776
distribution := slashEscape(c.Params.ByName("distribution"))
768777

778+
if c.Bind(&b) != nil {
779+
return
780+
}
781+
769782
collectionFactory := context.NewCollectionFactory()
770783
collection := collectionFactory.PublishedRepoCollection()
771784

785+
// Load shallowly for 404 check, resource key, and task name.
786+
// Full load and mutation happen inside the task.
772787
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
773788
if err != nil {
774789
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
775790
return
776791
}
777792

778-
err = collection.LoadComplete(published, collectionFactory)
779-
if err != nil {
780-
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
781-
return
782-
}
793+
resources := []string{string(published.Key())}
794+
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
795+
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
796+
taskCollectionFactory := context.NewCollectionFactory()
797+
taskCollection := taskCollectionFactory.PublishedRepoCollection()
783798

784-
if c.Bind(&b) != nil {
785-
return
786-
}
799+
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
800+
if err != nil {
801+
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
802+
}
803+
804+
err = taskCollection.LoadComplete(published, taskCollectionFactory)
805+
if err != nil {
806+
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
807+
}
787808

788-
revision := published.ObtainRevision()
789-
sources := make(map[string]string, len(b))
790-
revision.Sources = sources
809+
revision := published.ObtainRevision()
810+
sources := make(map[string]string, len(b))
811+
revision.Sources = sources
791812

792-
for _, source := range b {
793-
component := source.Component
794-
name := source.Name
795-
sources[component] = name
796-
}
813+
for _, source := range b {
814+
component := source.Component
815+
name := source.Name
816+
sources[component] = name
817+
}
797818

798-
resources := []string{string(published.Key())}
799-
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
800-
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
801-
err = collection.Update(published)
819+
err = taskCollection.Update(published)
802820
if err != nil {
803821
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
804822
}
@@ -831,24 +849,33 @@ func apiPublishDropChanges(c *gin.Context) {
831849
collectionFactory := context.NewCollectionFactory()
832850
collection := collectionFactory.PublishedRepoCollection()
833851

852+
// Load shallowly for 404 check, resource key, and task name.
853+
// Full load and DropRevision happen inside the task.
834854
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
835855
if err != nil {
836856
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
837857
return
838858
}
839859

840-
err = collection.LoadComplete(published, collectionFactory)
841-
if err != nil {
842-
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
843-
return
844-
}
845-
846-
published.DropRevision()
847-
848860
resources := []string{string(published.Key())}
849861
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
850862
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
851-
err = collection.Update(published)
863+
taskCollectionFactory := context.NewCollectionFactory()
864+
taskCollection := taskCollectionFactory.PublishedRepoCollection()
865+
866+
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
867+
if err != nil {
868+
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
869+
}
870+
871+
err = taskCollection.LoadComplete(published, taskCollectionFactory)
872+
if err != nil {
873+
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
874+
}
875+
876+
published.DropRevision()
877+
878+
err = taskCollection.Update(published)
852879
if err != nil {
853880
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
854881
}
@@ -884,51 +911,58 @@ func apiPublishUpdateSource(c *gin.Context) {
884911
param := slashEscape(c.Params.ByName("prefix"))
885912
storage, prefix := deb.ParsePrefix(param)
886913
distribution := slashEscape(c.Params.ByName("distribution"))
887-
component := slashEscape(c.Params.ByName("component"))
914+
urlComponent := slashEscape(c.Params.ByName("component"))
915+
916+
// Default component to the URL path segment; the body may rename it.
917+
b.Component = urlComponent
918+
if c.Bind(&b) != nil {
919+
return
920+
}
888921

889922
collectionFactory := context.NewCollectionFactory()
890923
collection := collectionFactory.PublishedRepoCollection()
891924

925+
// Load shallowly for 404 check, resource key, and task name.
926+
// Full load and mutation happen inside the task.
892927
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
893928
if err != nil {
894929
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
895930
return
896931
}
897932

898-
err = collection.LoadComplete(published, collectionFactory)
899-
if err != nil {
900-
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
901-
return
902-
}
933+
resources := []string{string(published.Key())}
934+
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
935+
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
936+
taskCollectionFactory := context.NewCollectionFactory()
937+
taskCollection := taskCollectionFactory.PublishedRepoCollection()
903938

904-
revision := published.ObtainRevision()
905-
sources := revision.Sources
939+
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
940+
if err != nil {
941+
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: %s", err)
942+
}
906943

907-
_, exists := sources[component]
908-
if !exists {
909-
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: Component '%s' does not exist", component))
910-
return
911-
}
944+
err = taskCollection.LoadComplete(published, taskCollectionFactory)
945+
if err != nil {
946+
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
947+
}
912948

913-
b.Component = component
914-
b.Name = revision.Sources[component]
949+
revision := published.ObtainRevision()
950+
sources := revision.Sources
915951

916-
if c.Bind(&b) != nil {
917-
return
918-
}
952+
_, exists := sources[urlComponent]
953+
if !exists {
954+
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to update: Component '%s' does not exist", urlComponent)
955+
}
919956

920-
if b.Component != component {
921-
delete(sources, component)
922-
}
957+
if b.Component != urlComponent {
958+
delete(sources, urlComponent)
959+
}
923960

924-
component = b.Component
925-
name := b.Name
926-
sources[component] = name
961+
newComponent := b.Component
962+
name := b.Name
963+
sources[newComponent] = name
927964

928-
resources := []string{string(published.Key())}
929-
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
930-
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
931-
err = collection.Update(published)
965+
err = taskCollection.Update(published)
932966
if err != nil {
933967
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
934968
}
@@ -965,33 +999,41 @@ func apiPublishRemoveSource(c *gin.Context) {
965999
collectionFactory := context.NewCollectionFactory()
9661000
collection := collectionFactory.PublishedRepoCollection()
9671001

1002+
// Load shallowly for 404 check, resource key, and task name.
1003+
// Full load and mutation happen inside the task.
9681004
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
9691005
if err != nil {
9701006
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: %s", err))
9711007
return
9721008
}
9731009

974-
err = collection.LoadComplete(published, collectionFactory)
975-
if err != nil {
976-
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to delete: %s", err))
977-
return
978-
}
1010+
resources := []string{string(published.Key())}
1011+
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
1012+
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
1013+
taskCollectionFactory := context.NewCollectionFactory()
1014+
taskCollection := taskCollectionFactory.PublishedRepoCollection()
9791015

980-
revision := published.ObtainRevision()
981-
sources := revision.Sources
1016+
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
1017+
if err != nil {
1018+
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: %s", err)
1019+
}
9821020

983-
_, exists := sources[component]
984-
if !exists {
985-
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to delete: Component '%s' does not exist", component))
986-
return
987-
}
1021+
err = taskCollection.LoadComplete(published, taskCollectionFactory)
1022+
if err != nil {
1023+
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to delete: %s", err)
1024+
}
1025+
1026+
revision := published.ObtainRevision()
1027+
sources := revision.Sources
9881028

989-
delete(sources, component)
1029+
_, exists := sources[component]
1030+
if !exists {
1031+
return &task.ProcessReturnValue{Code: http.StatusNotFound, Value: nil}, fmt.Errorf("unable to delete: Component '%s' does not exist", component)
1032+
}
9901033

991-
resources := []string{string(published.Key())}
992-
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
993-
maybeRunTaskInBackground(c, taskName, resources, func(_ aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
994-
err = collection.Update(published)
1034+
delete(sources, component)
1035+
1036+
err = taskCollection.Update(published)
9951037
if err != nil {
9961038
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
9971039
}

0 commit comments

Comments
 (0)