Skip to content

Commit b7969c7

Browse files
committed
fix(publish): reload published inside task for update/switch endpoints
Affected endpoints: apiPublishUpdateSwitch (PUT), apiPublishUpdate (POST). Both handlers loaded the published repo and mutated scalar fields (Label, Origin, SkipContents, SkipBz2, AcquireByHash, SignedBy, MultiDist, Version) outside the task closure, before the lock was acquired. Inside the task, LoadComplete only refreshed sourceItems — it did not reload scalar fields or the Revision. Two concurrent requests therefore each operated on a stale base: Request A loads published (Label="old"), sets Label="A" Request B loads published (Label="old"), sets Label="B" Task A runs: Update() + Publish() + collection.Update() -> saves Label="A" Task B runs: Update() on B's stale copy -> saves Label="B", silently discarding A's Label change and potentially reconciling a Revision built against the pre-A state. Fix: remove all field mutations and the LoadComplete call from the HTTP handler. Inside the task, a fresh taskCollectionFactory is created, the published repo is re-read via ByStoragePrefixDistribution + LoadComplete (obtaining the current DB state after the lock is held), and then all field mutations are applied before Update / Publish / collection.Update.
1 parent 2a5992c commit b7969c7

1 file changed

Lines changed: 85 additions & 78 deletions

File tree

api/publish.go

Lines changed: 85 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -492,46 +492,50 @@ func apiPublishUpdateSwitch(c *gin.Context) {
492492
return
493493
}
494494

495-
if b.SkipContents != nil {
496-
published.SkipContents = *b.SkipContents
497-
}
498-
499-
if b.SkipBz2 != nil {
500-
published.SkipBz2 = *b.SkipBz2
501-
}
502-
503-
if b.AcquireByHash != nil {
504-
published.AcquireByHash = *b.AcquireByHash
505-
}
506-
507-
if b.SignedBy != nil {
508-
published.SignedBy = *b.SignedBy
509-
}
510-
511-
if b.MultiDist != nil {
512-
published.MultiDist = *b.MultiDist
513-
}
514-
515-
if b.Label != nil {
516-
published.Label = *b.Label
517-
}
518-
519-
if b.Origin != nil {
520-
published.Origin = *b.Origin
521-
}
522-
523-
if b.Version != nil {
524-
published.Version = *b.Version
525-
}
526-
495+
// Field mutations and fresh DB load are deferred to inside the task so
496+
// they always operate on a consistent state after the lock is held.
527497
resources := []string{string(published.Key())}
528498
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
529499
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
530-
err = collection.LoadComplete(published, collectionFactory)
500+
taskCollectionFactory := context.NewCollectionFactory()
501+
taskCollection := taskCollectionFactory.PublishedRepoCollection()
502+
503+
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
504+
if err != nil {
505+
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
506+
}
507+
508+
err = taskCollection.LoadComplete(published, taskCollectionFactory)
531509
if err != nil {
532510
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
533511
}
534512

513+
// Apply field mutations on the freshly loaded object.
514+
if b.SkipContents != nil {
515+
published.SkipContents = *b.SkipContents
516+
}
517+
if b.SkipBz2 != nil {
518+
published.SkipBz2 = *b.SkipBz2
519+
}
520+
if b.AcquireByHash != nil {
521+
published.AcquireByHash = *b.AcquireByHash
522+
}
523+
if b.SignedBy != nil {
524+
published.SignedBy = *b.SignedBy
525+
}
526+
if b.MultiDist != nil {
527+
published.MultiDist = *b.MultiDist
528+
}
529+
if b.Label != nil {
530+
published.Label = *b.Label
531+
}
532+
if b.Origin != nil {
533+
published.Origin = *b.Origin
534+
}
535+
if b.Version != nil {
536+
published.Version = *b.Version
537+
}
538+
535539
revision := published.ObtainRevision()
536540
sources := revision.Sources
537541

@@ -543,25 +547,25 @@ func apiPublishUpdateSwitch(c *gin.Context) {
543547
}
544548
}
545549

546-
result, err := published.Update(collectionFactory, out)
550+
result, err := published.Update(taskCollectionFactory, out)
547551
if err != nil {
548552
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
549553
}
550554

551-
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
555+
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
552556
if err != nil {
553557
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
554558
}
555559

556-
err = collection.Update(published)
560+
err = taskCollection.Update(published)
557561
if err != nil {
558562
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
559563
}
560564

561565
if b.SkipCleanup == nil || !*b.SkipCleanup {
562566
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
563567
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
564-
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
568+
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
565569
if err != nil {
566570
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
567571
}
@@ -1105,72 +1109,75 @@ func apiPublishUpdate(c *gin.Context) {
11051109
collectionFactory := context.NewCollectionFactory()
11061110
collection := collectionFactory.PublishedRepoCollection()
11071111

1112+
// Load shallowly for 404 check, resource key, and task name.
1113+
// Full load and field mutations happen inside the task.
11081114
published, err := collection.ByStoragePrefixDistribution(storage, prefix, distribution)
11091115
if err != nil {
11101116
AbortWithJSONError(c, http.StatusNotFound, fmt.Errorf("unable to update: %s", err))
11111117
return
11121118
}
11131119

1114-
err = collection.LoadComplete(published, collectionFactory)
1115-
if err != nil {
1116-
AbortWithJSONError(c, http.StatusInternalServerError, fmt.Errorf("unable to update: %s", err))
1117-
return
1118-
}
1119-
1120-
if b.SkipContents != nil {
1121-
published.SkipContents = *b.SkipContents
1122-
}
1123-
1124-
if b.SkipBz2 != nil {
1125-
published.SkipBz2 = *b.SkipBz2
1126-
}
1127-
1128-
if b.AcquireByHash != nil {
1129-
published.AcquireByHash = *b.AcquireByHash
1130-
}
1131-
1132-
if b.SignedBy != nil {
1133-
published.SignedBy = *b.SignedBy
1134-
}
1135-
1136-
if b.MultiDist != nil {
1137-
published.MultiDist = *b.MultiDist
1138-
}
1120+
resources := []string{string(published.Key())}
1121+
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
1122+
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
1123+
taskCollectionFactory := context.NewCollectionFactory()
1124+
taskCollection := taskCollectionFactory.PublishedRepoCollection()
11391125

1140-
if b.Label != nil {
1141-
published.Label = *b.Label
1142-
}
1126+
published, err := taskCollection.ByStoragePrefixDistribution(storage, prefix, distribution)
1127+
if err != nil {
1128+
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
1129+
}
11431130

1144-
if b.Origin != nil {
1145-
published.Origin = *b.Origin
1146-
}
1131+
err = taskCollection.LoadComplete(published, taskCollectionFactory)
1132+
if err != nil {
1133+
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
1134+
}
11471135

1148-
if b.Version != nil {
1149-
published.Version = *b.Version
1150-
}
1136+
// Apply field mutations on the freshly loaded object.
1137+
if b.SkipContents != nil {
1138+
published.SkipContents = *b.SkipContents
1139+
}
1140+
if b.SkipBz2 != nil {
1141+
published.SkipBz2 = *b.SkipBz2
1142+
}
1143+
if b.AcquireByHash != nil {
1144+
published.AcquireByHash = *b.AcquireByHash
1145+
}
1146+
if b.SignedBy != nil {
1147+
published.SignedBy = *b.SignedBy
1148+
}
1149+
if b.MultiDist != nil {
1150+
published.MultiDist = *b.MultiDist
1151+
}
1152+
if b.Label != nil {
1153+
published.Label = *b.Label
1154+
}
1155+
if b.Origin != nil {
1156+
published.Origin = *b.Origin
1157+
}
1158+
if b.Version != nil {
1159+
published.Version = *b.Version
1160+
}
11511161

1152-
resources := []string{string(published.Key())}
1153-
taskName := fmt.Sprintf("Update published %s repository %s/%s", published.SourceKind, published.StoragePrefix(), published.Distribution)
1154-
maybeRunTaskInBackground(c, taskName, resources, func(out aptly.Progress, _ *task.Detail) (*task.ProcessReturnValue, error) {
1155-
result, err := published.Update(collectionFactory, out)
1162+
result, err := published.Update(taskCollectionFactory, out)
11561163
if err != nil {
11571164
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
11581165
}
11591166

1160-
err = published.Publish(context.PackagePool(), context, collectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
1167+
err = published.Publish(context.PackagePool(), context, taskCollectionFactory, signer, out, b.ForceOverwrite, context.SkelPath())
11611168
if err != nil {
11621169
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
11631170
}
11641171

1165-
err = collection.Update(published)
1172+
err = taskCollection.Update(published)
11661173
if err != nil {
11671174
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
11681175
}
11691176

11701177
if b.SkipCleanup == nil || !*b.SkipCleanup {
11711178
cleanComponents := make([]string, 0, len(result.UpdatedSources)+len(result.RemovedSources))
11721179
cleanComponents = append(append(cleanComponents, result.UpdatedComponents()...), result.RemovedComponents()...)
1173-
err = collection.CleanupPrefixComponentFiles(context, published, cleanComponents, collectionFactory, out)
1180+
err = taskCollection.CleanupPrefixComponentFiles(context, published, cleanComponents, taskCollectionFactory, out)
11741181
if err != nil {
11751182
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
11761183
}

0 commit comments

Comments
 (0)