Skip to content

Commit 8db4c06

Browse files
committed
feat(storage): enable parallel writes by using per-repo and per-digest locking
- lock per repo on pushes/pulls/retention, in short index operations - lock per digest when using multiple operations affecting the cachedb and storage (blob writes/deletes/moves/links in storage which need to be in accordance with cachedb content) Do not lock multiple repos at the same time in the same goroutine! It will cause deadlocks. Same applies to digests. Signed-off-by: Andrei Aaron <[email protected]>
1 parent 983dc7f commit 8db4c06

File tree

15 files changed

+895
-720
lines changed

15 files changed

+895
-720
lines changed

pkg/api/routes.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -956,12 +956,12 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
956956
if userCanMount {
957957
ok, blen, err = imgStore.CheckBlob(name, digest)
958958
} else {
959-
var lockLatency time.Time
959+
err = imgStore.WithRepoReadLock(name, func() error {
960+
var err error
961+
ok, blen, _, err = imgStore.StatBlob(name, digest)
960962

961-
imgStore.RLock(&lockLatency)
962-
defer imgStore.RUnlock(&lockLatency)
963-
964-
ok, blen, _, err = imgStore.StatBlob(name, digest)
963+
return err
964+
})
965965
}
966966

967967
if err != nil {

pkg/extensions/sync/destination.go

+7-6
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"os"
1212
"path"
1313
"strings"
14-
"time"
1514

1615
"github.com/containers/image/v5/types"
1716
"github.com/opencontainers/go-digest"
@@ -99,8 +98,6 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
9998
registry.log.Info().Str("syncTempDir", path.Join(tempImageStore.RootDir(), repo)).Str("reference", reference).
10099
Msg("pushing synced local image to local registry")
101100

102-
var lockLatency time.Time
103-
104101
manifestBlob, manifestDigest, mediaType, err := tempImageStore.GetImageManifest(repo, reference)
105102
if err != nil {
106103
registry.log.Error().Str("errorType", common.TypeOf(err)).
@@ -136,10 +133,14 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
136133
}
137134

138135
for _, manifest := range indexManifest.Manifests {
139-
tempImageStore.RLock(&lockLatency)
140-
manifestBuf, err := tempImageStore.GetBlobContent(repo, manifest.Digest)
141-
tempImageStore.RUnlock(&lockLatency)
136+
var manifestBuf []byte
137+
138+
err := tempImageStore.WithRepoReadLock(repo, func() error {
139+
var err error
140+
manifestBuf, err = tempImageStore.GetBlobContent(repo, manifest.Digest)
142141

142+
return err
143+
})
143144
if err != nil {
144145
registry.log.Error().Str("errorType", common.TypeOf(err)).
145146
Err(err).Str("dir", path.Join(tempImageStore.RootDir(), repo)).Str("digest", manifest.Digest.String()).

pkg/meta/parse.go

+78-82
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7-
"time"
87

98
godigest "github.com/opencontainers/go-digest"
109
ispec "github.com/opencontainers/image-spec/specs-go/v1"
@@ -108,66 +107,65 @@ func getReposToBeDeleted(allStorageRepos []string, allMetaDBRepos []string) []st
108107
func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreController, log log.Logger) error {
109108
imageStore := storeController.GetImageStore(repo)
110109

111-
var lockLatency time.Time
110+
err := imageStore.WithRepoReadLock(repo, func() error {
111+
indexBlob, err := imageStore.GetIndexContent(repo)
112+
if err != nil {
113+
log.Error().Err(err).Str("repository", repo).Msg("failed to read index.json for repo")
112114

113-
imageStore.RLock(&lockLatency)
114-
defer imageStore.RUnlock(&lockLatency)
115+
return err
116+
}
115117

116-
indexBlob, err := imageStore.GetIndexContent(repo)
117-
if err != nil {
118-
log.Error().Err(err).Str("repository", repo).Msg("failed to read index.json for repo")
118+
var indexContent ispec.Index
119119

120-
return err
121-
}
120+
err = json.Unmarshal(indexBlob, &indexContent)
121+
if err != nil {
122+
log.Error().Err(err).Str("repository", repo).Msg("failed to unmarshal index.json for repo")
122123

123-
var indexContent ispec.Index
124+
return err
125+
}
124126

125-
err = json.Unmarshal(indexBlob, &indexContent)
126-
if err != nil {
127-
log.Error().Err(err).Str("repository", repo).Msg("failed to unmarshal index.json for repo")
127+
err = metaDB.ResetRepoReferences(repo)
128+
if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
129+
log.Error().Err(err).Str("repository", repo).Msg("failed to reset tag field in RepoMetadata for repo")
128130

129-
return err
130-
}
131+
return err
132+
}
131133

132-
err = metaDB.ResetRepoReferences(repo)
133-
if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
134-
log.Error().Err(err).Str("repository", repo).Msg("failed to reset tag field in RepoMetadata for repo")
134+
for _, manifest := range indexContent.Manifests {
135+
tag := manifest.Annotations[ispec.AnnotationRefName]
135136

136-
return err
137-
}
137+
if zcommon.IsReferrersTag(tag) {
138+
continue
139+
}
138140

139-
for _, manifest := range indexContent.Manifests {
140-
tag := manifest.Annotations[ispec.AnnotationRefName]
141+
manifestBlob, _, _, err := imageStore.GetImageManifest(repo, manifest.Digest.String())
142+
if err != nil {
143+
log.Error().Err(err).Str("repository", repo).Str("digest", manifest.Digest.String()).
144+
Msg("failed to get blob for image")
141145

142-
if zcommon.IsReferrersTag(tag) {
143-
continue
144-
}
146+
return err
147+
}
145148

146-
manifestBlob, _, _, err := imageStore.GetImageManifest(repo, manifest.Digest.String())
147-
if err != nil {
148-
log.Error().Err(err).Str("repository", repo).Str("digest", manifest.Digest.String()).
149-
Msg("failed to get blob for image")
149+
reference := tag
150150

151-
return err
152-
}
151+
if tag == "" {
152+
reference = manifest.Digest.String()
153+
}
153154

154-
reference := tag
155+
err = SetImageMetaFromInput(context.Background(), repo, reference, manifest.MediaType, manifest.Digest, manifestBlob,
156+
imageStore, metaDB, log)
157+
if err != nil {
158+
log.Error().Err(err).Str("repository", repo).Str("tag", tag).
159+
Msg("failed to set metadata for image")
155160

156-
if tag == "" {
157-
reference = manifest.Digest.String()
161+
return err
162+
}
158163
}
159164

160-
err = SetImageMetaFromInput(context.Background(), repo, reference, manifest.MediaType, manifest.Digest, manifestBlob,
161-
imageStore, metaDB, log)
162-
if err != nil {
163-
log.Error().Err(err).Str("repository", repo).Str("tag", tag).
164-
Msg("failed to set metadata for image")
165-
166-
return err
167-
}
168-
}
165+
return nil
166+
})
169167

170-
return nil
168+
return err
171169
}
172170

173171
func getAllRepos(storeController stypes.StoreController, log log.Logger) ([]string, error) {
@@ -222,34 +220,33 @@ func getCosignSignatureLayersInfo(
222220
return layers, err
223221
}
224222

225-
var lockLatency time.Time
226-
227-
imageStore.RLock(&lockLatency)
228-
defer imageStore.RUnlock(&lockLatency)
223+
err := imageStore.WithRepoReadLock(repo, func() error {
224+
for _, layer := range manifestContent.Layers {
225+
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
226+
if err != nil {
227+
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).
228+
Msg("failed to get cosign signature layer content")
229229

230-
for _, layer := range manifestContent.Layers {
231-
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
232-
if err != nil {
233-
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).Msg(
234-
"failed to get cosign signature layer content")
230+
return err
231+
}
235232

236-
return layers, err
237-
}
233+
layerSigKey, ok := layer.Annotations[zcommon.CosignSigKey]
234+
if !ok {
235+
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).
236+
Msg("failed to get specific annotation of cosign signature")
237+
}
238238

239-
layerSigKey, ok := layer.Annotations[zcommon.CosignSigKey]
240-
if !ok {
241-
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).Msg(
242-
"failed to get specific annotation of cosign signature")
239+
layers = append(layers, mTypes.LayerInfo{
240+
LayerDigest: layer.Digest.String(),
241+
LayerContent: layerContent,
242+
SignatureKey: layerSigKey,
243+
})
243244
}
244245

245-
layers = append(layers, mTypes.LayerInfo{
246-
LayerDigest: layer.Digest.String(),
247-
LayerContent: layerContent,
248-
SignatureKey: layerSigKey,
249-
})
250-
}
246+
return nil
247+
})
251248

252-
return layers, nil
249+
return layers, err
253250
}
254251

255252
func getNotationSignatureLayersInfo(
@@ -279,28 +276,27 @@ func getNotationSignatureLayersInfo(
279276

280277
layer := manifestContent.Layers[0].Digest
281278

282-
var lockLatency time.Time
279+
err := imageStore.WithRepoReadLock(repo, func() error {
280+
layerContent, err := imageStore.GetBlobContent(repo, layer)
281+
if err != nil {
282+
log.Error().Err(err).Str("repository", repo).Str("reference", manifestDigest).Str("layerDigest", layer.String()).
283+
Msg("failed to get notation signature blob content")
283284

284-
imageStore.RLock(&lockLatency)
285-
defer imageStore.RUnlock(&lockLatency)
285+
return err
286+
}
286287

287-
layerContent, err := imageStore.GetBlobContent(repo, layer)
288-
if err != nil {
289-
log.Error().Err(err).Str("repository", repo).Str("reference", manifestDigest).Str("layerDigest", layer.String()).Msg(
290-
"failed to get notation signature blob content")
288+
layerSigKey := manifestContent.Layers[0].MediaType
291289

292-
return layers, err
293-
}
294-
295-
layerSigKey := manifestContent.Layers[0].MediaType
290+
layers = append(layers, mTypes.LayerInfo{
291+
LayerDigest: layer.String(),
292+
LayerContent: layerContent,
293+
SignatureKey: layerSigKey,
294+
})
296295

297-
layers = append(layers, mTypes.LayerInfo{
298-
LayerDigest: layer.String(),
299-
LayerContent: layerContent,
300-
SignatureKey: layerSigKey,
296+
return nil
301297
})
302298

303-
return layers, nil
299+
return layers, err
304300
}
305301

306302
// SetMetadataFromInput tries to set manifest metadata and update repo metadata by adding the current tag

pkg/storage/common/common.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -876,9 +876,6 @@ type DedupeTaskGenerator struct {
876876
ImgStore storageTypes.ImageStore
877877
// storage dedupe value
878878
Dedupe bool
879-
// store blobs paths grouped by digest
880-
digest godigest.Digest
881-
duplicateBlobs []string
882879
/* store processed digest, used for iterating duplicateBlobs one by one
883880
and generating a task for each unprocessed one*/
884881
lastDigests []godigest.Digest
@@ -917,15 +914,15 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
917914
}
918915

919916
// get all blobs from storage.imageStore and group them by digest
920-
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
917+
digest, duplicateBlobs, err := gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
921918
if err != nil {
922919
gen.Log.Error().Err(err).Str("component", "dedupe").Msg("failed to get next digest")
923920

924921
return nil, err
925922
}
926923

927924
// if no digests left, then mark the task generator as done
928-
if gen.digest == "" {
925+
if digest == "" {
929926
gen.Log.Info().Str("component", "dedupe").Msg("no digests left, finished")
930927

931928
gen.done = true
@@ -934,10 +931,10 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
934931
}
935932

936933
// mark digest as processed before running its task
937-
gen.lastDigests = append(gen.lastDigests, gen.digest)
934+
gen.lastDigests = append(gen.lastDigests, digest)
938935

939936
// generate rebuild dedupe task for this digest
940-
return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil
937+
return newDedupeTask(gen.ImgStore, digest, gen.Dedupe, duplicateBlobs, gen.Log), nil
941938
}
942939

943940
func (gen *DedupeTaskGenerator) IsDone() bool {
@@ -950,9 +947,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool {
950947

951948
func (gen *DedupeTaskGenerator) Reset() {
952949
gen.lastDigests = []godigest.Digest{}
953-
gen.duplicateBlobs = []string{}
954950
gen.repos = []string{}
955-
gen.digest = ""
956951
gen.done = false
957952
}
958953

0 commit comments

Comments
 (0)