Skip to content

Commit 6a839f2

Browse files
committed
feat(storage): enable parallel writes by using per-repo and per-digest locking
- lock per repo on pushes/pulls/retention, in short index operations - lock per digest when using multiple operations affecting the cachedb and storage (blob writes/deletes/moves/links in storage which need to be in accordance with cachedb content) Do not lock multiple repos at the same time in the same goroutine! It will cause deadlocks. Same applies to digests. Signed-off-by: Andrei Aaron <[email protected]>
1 parent 2a4edde commit 6a839f2

File tree

15 files changed

+895
-720
lines changed

15 files changed

+895
-720
lines changed

pkg/api/routes.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -956,12 +956,12 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
956956
if userCanMount {
957957
ok, blen, err = imgStore.CheckBlob(name, digest)
958958
} else {
959-
var lockLatency time.Time
959+
err = imgStore.WithRepoReadLock(name, func() error {
960+
var err error
961+
ok, blen, _, err = imgStore.StatBlob(name, digest)
960962

961-
imgStore.RLock(&lockLatency)
962-
defer imgStore.RUnlock(&lockLatency)
963-
964-
ok, blen, _, err = imgStore.StatBlob(name, digest)
963+
return err
964+
})
965965
}
966966

967967
if err != nil {

pkg/extensions/sync/destination.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"os"
1212
"path"
1313
"strings"
14-
"time"
1514

1615
"github.com/containers/image/v5/types"
1716
"github.com/opencontainers/go-digest"
@@ -99,8 +98,6 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
9998
registry.log.Info().Str("syncTempDir", path.Join(tempImageStore.RootDir(), repo)).Str("reference", reference).
10099
Msg("pushing synced local image to local registry")
101100

102-
var lockLatency time.Time
103-
104101
manifestBlob, manifestDigest, mediaType, err := tempImageStore.GetImageManifest(repo, reference)
105102
if err != nil {
106103
registry.log.Error().Str("errorType", common.TypeOf(err)).
@@ -136,10 +133,14 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
136133
}
137134

138135
for _, manifest := range indexManifest.Manifests {
139-
tempImageStore.RLock(&lockLatency)
140-
manifestBuf, err := tempImageStore.GetBlobContent(repo, manifest.Digest)
141-
tempImageStore.RUnlock(&lockLatency)
136+
var manifestBuf []byte
137+
138+
err := tempImageStore.WithRepoReadLock(repo, func() error {
139+
var err error
140+
manifestBuf, err = tempImageStore.GetBlobContent(repo, manifest.Digest)
142141

142+
return err
143+
})
143144
if err != nil {
144145
registry.log.Error().Str("errorType", common.TypeOf(err)).
145146
Err(err).Str("dir", path.Join(tempImageStore.RootDir(), repo)).Str("digest", manifest.Digest.String()).

pkg/meta/parse.go

+78-82
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7-
"time"
87

98
godigest "github.com/opencontainers/go-digest"
109
ispec "github.com/opencontainers/image-spec/specs-go/v1"
@@ -108,66 +107,65 @@ func getReposToBeDeleted(allStorageRepos []string, allMetaDBRepos []string) []st
108107
func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreController, log log.Logger) error {
109108
imageStore := storeController.GetImageStore(repo)
110109

111-
var lockLatency time.Time
110+
err := imageStore.WithRepoReadLock(repo, func() error {
111+
indexBlob, err := imageStore.GetIndexContent(repo)
112+
if err != nil {
113+
log.Error().Err(err).Str("repository", repo).Msg("failed to read index.json for repo")
112114

113-
imageStore.RLock(&lockLatency)
114-
defer imageStore.RUnlock(&lockLatency)
115+
return err
116+
}
115117

116-
indexBlob, err := imageStore.GetIndexContent(repo)
117-
if err != nil {
118-
log.Error().Err(err).Str("repository", repo).Msg("failed to read index.json for repo")
118+
var indexContent ispec.Index
119119

120-
return err
121-
}
120+
err = json.Unmarshal(indexBlob, &indexContent)
121+
if err != nil {
122+
log.Error().Err(err).Str("repository", repo).Msg("failed to unmarshal index.json for repo")
122123

123-
var indexContent ispec.Index
124+
return err
125+
}
124126

125-
err = json.Unmarshal(indexBlob, &indexContent)
126-
if err != nil {
127-
log.Error().Err(err).Str("repository", repo).Msg("failed to unmarshal index.json for repo")
127+
err = metaDB.ResetRepoReferences(repo)
128+
if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
129+
log.Error().Err(err).Str("repository", repo).Msg("failed to reset tag field in RepoMetadata for repo")
128130

129-
return err
130-
}
131+
return err
132+
}
131133

132-
err = metaDB.ResetRepoReferences(repo)
133-
if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
134-
log.Error().Err(err).Str("repository", repo).Msg("failed to reset tag field in RepoMetadata for repo")
134+
for _, manifest := range indexContent.Manifests {
135+
tag := manifest.Annotations[ispec.AnnotationRefName]
135136

136-
return err
137-
}
137+
if zcommon.IsReferrersTag(tag) {
138+
continue
139+
}
138140

139-
for _, manifest := range indexContent.Manifests {
140-
tag := manifest.Annotations[ispec.AnnotationRefName]
141+
manifestBlob, _, _, err := imageStore.GetImageManifest(repo, manifest.Digest.String())
142+
if err != nil {
143+
log.Error().Err(err).Str("repository", repo).Str("digest", manifest.Digest.String()).
144+
Msg("failed to get blob for image")
141145

142-
if zcommon.IsReferrersTag(tag) {
143-
continue
144-
}
146+
return err
147+
}
145148

146-
manifestBlob, _, _, err := imageStore.GetImageManifest(repo, manifest.Digest.String())
147-
if err != nil {
148-
log.Error().Err(err).Str("repository", repo).Str("digest", manifest.Digest.String()).
149-
Msg("failed to get blob for image")
149+
reference := tag
150150

151-
return err
152-
}
151+
if tag == "" {
152+
reference = manifest.Digest.String()
153+
}
153154

154-
reference := tag
155+
err = SetImageMetaFromInput(context.Background(), repo, reference, manifest.MediaType, manifest.Digest, manifestBlob,
156+
imageStore, metaDB, log)
157+
if err != nil {
158+
log.Error().Err(err).Str("repository", repo).Str("tag", tag).
159+
Msg("failed to set metadata for image")
155160

156-
if tag == "" {
157-
reference = manifest.Digest.String()
161+
return err
162+
}
158163
}
159164

160-
err = SetImageMetaFromInput(context.Background(), repo, reference, manifest.MediaType, manifest.Digest, manifestBlob,
161-
imageStore, metaDB, log)
162-
if err != nil {
163-
log.Error().Err(err).Str("repository", repo).Str("tag", tag).
164-
Msg("failed to set metadata for image")
165-
166-
return err
167-
}
168-
}
165+
return nil
166+
})
169167

170-
return nil
168+
return err
171169
}
172170

173171
func getAllRepos(storeController stypes.StoreController, log log.Logger) ([]string, error) {
@@ -222,34 +220,33 @@ func getCosignSignatureLayersInfo(
222220
return layers, err
223221
}
224222

225-
var lockLatency time.Time
226-
227-
imageStore.RLock(&lockLatency)
228-
defer imageStore.RUnlock(&lockLatency)
223+
err := imageStore.WithRepoReadLock(repo, func() error {
224+
for _, layer := range manifestContent.Layers {
225+
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
226+
if err != nil {
227+
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).
228+
Msg("failed to get cosign signature layer content")
229229

230-
for _, layer := range manifestContent.Layers {
231-
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
232-
if err != nil {
233-
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).Msg(
234-
"failed to get cosign signature layer content")
230+
return err
231+
}
235232

236-
return layers, err
237-
}
233+
layerSigKey, ok := layer.Annotations[zcommon.CosignSigKey]
234+
if !ok {
235+
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).
236+
Msg("failed to get specific annotation of cosign signature")
237+
}
238238

239-
layerSigKey, ok := layer.Annotations[zcommon.CosignSigKey]
240-
if !ok {
241-
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).Msg(
242-
"failed to get specific annotation of cosign signature")
239+
layers = append(layers, mTypes.LayerInfo{
240+
LayerDigest: layer.Digest.String(),
241+
LayerContent: layerContent,
242+
SignatureKey: layerSigKey,
243+
})
243244
}
244245

245-
layers = append(layers, mTypes.LayerInfo{
246-
LayerDigest: layer.Digest.String(),
247-
LayerContent: layerContent,
248-
SignatureKey: layerSigKey,
249-
})
250-
}
246+
return nil
247+
})
251248

252-
return layers, nil
249+
return layers, err
253250
}
254251

255252
func getNotationSignatureLayersInfo(
@@ -279,28 +276,27 @@ func getNotationSignatureLayersInfo(
279276

280277
layer := manifestContent.Layers[0].Digest
281278

282-
var lockLatency time.Time
279+
err := imageStore.WithRepoReadLock(repo, func() error {
280+
layerContent, err := imageStore.GetBlobContent(repo, layer)
281+
if err != nil {
282+
log.Error().Err(err).Str("repository", repo).Str("reference", manifestDigest).Str("layerDigest", layer.String()).
283+
Msg("failed to get notation signature blob content")
283284

284-
imageStore.RLock(&lockLatency)
285-
defer imageStore.RUnlock(&lockLatency)
285+
return err
286+
}
286287

287-
layerContent, err := imageStore.GetBlobContent(repo, layer)
288-
if err != nil {
289-
log.Error().Err(err).Str("repository", repo).Str("reference", manifestDigest).Str("layerDigest", layer.String()).Msg(
290-
"failed to get notation signature blob content")
288+
layerSigKey := manifestContent.Layers[0].MediaType
291289

292-
return layers, err
293-
}
294-
295-
layerSigKey := manifestContent.Layers[0].MediaType
290+
layers = append(layers, mTypes.LayerInfo{
291+
LayerDigest: layer.String(),
292+
LayerContent: layerContent,
293+
SignatureKey: layerSigKey,
294+
})
296295

297-
layers = append(layers, mTypes.LayerInfo{
298-
LayerDigest: layer.String(),
299-
LayerContent: layerContent,
300-
SignatureKey: layerSigKey,
296+
return nil
301297
})
302298

303-
return layers, nil
299+
return layers, err
304300
}
305301

306302
// SetMetadataFromInput tries to set manifest metadata and update repo metadata by adding the current tag

pkg/storage/common/common.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -871,9 +871,6 @@ type DedupeTaskGenerator struct {
871871
ImgStore storageTypes.ImageStore
872872
// storage dedupe value
873873
Dedupe bool
874-
// store blobs paths grouped by digest
875-
digest godigest.Digest
876-
duplicateBlobs []string
877874
/* store processed digest, used for iterating duplicateBlobs one by one
878875
and generating a task for each unprocessed one*/
879876
lastDigests []godigest.Digest
@@ -912,15 +909,15 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
912909
}
913910

914911
// get all blobs from storage.imageStore and group them by digest
915-
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
912+
digest, duplicateBlobs, err := gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
916913
if err != nil {
917914
gen.Log.Error().Err(err).Str("component", "dedupe").Msg("failed to get next digest")
918915

919916
return nil, err
920917
}
921918

922919
// if no digests left, then mark the task generator as done
923-
if gen.digest == "" {
920+
if digest == "" {
924921
gen.Log.Info().Str("component", "dedupe").Msg("no digests left, finished")
925922

926923
gen.done = true
@@ -929,10 +926,10 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
929926
}
930927

931928
// mark digest as processed before running its task
932-
gen.lastDigests = append(gen.lastDigests, gen.digest)
929+
gen.lastDigests = append(gen.lastDigests, digest)
933930

934931
// generate rebuild dedupe task for this digest
935-
return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil
932+
return newDedupeTask(gen.ImgStore, digest, gen.Dedupe, duplicateBlobs, gen.Log), nil
936933
}
937934

938935
func (gen *DedupeTaskGenerator) IsDone() bool {
@@ -945,9 +942,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool {
945942

946943
func (gen *DedupeTaskGenerator) Reset() {
947944
gen.lastDigests = []godigest.Digest{}
948-
gen.duplicateBlobs = []string{}
949945
gen.repos = []string{}
950-
gen.digest = ""
951946
gen.done = false
952947
}
953948

0 commit comments

Comments
 (0)