Skip to content

feat(storage): enable parallel writes by using per-repo and per-digest locking #2968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/nightly.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
name: 'Nightly jobs'
on:
schedule:
- cron: '30 1 * * *'
# schedule:
# - cron: '30 1 * * *'
workflow_dispatch:
pull_request:
branches:
- main

permissions: read-all

Expand Down
10 changes: 5 additions & 5 deletions pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,12 +956,12 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
if userCanMount {
ok, blen, err = imgStore.CheckBlob(name, digest)
} else {
var lockLatency time.Time
err = imgStore.WithRepoReadLock(name, func() error {
var err error
ok, blen, _, err = imgStore.StatBlob(name, digest)

imgStore.RLock(&lockLatency)
defer imgStore.RUnlock(&lockLatency)

ok, blen, _, err = imgStore.StatBlob(name, digest)
return err
})
}

if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions pkg/extensions/sync/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"os"
"path"
"strings"
"time"

"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -99,8 +98,6 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
registry.log.Info().Str("syncTempDir", path.Join(tempImageStore.RootDir(), repo)).Str("reference", reference).
Msg("pushing synced local image to local registry")

var lockLatency time.Time

manifestBlob, manifestDigest, mediaType, err := tempImageStore.GetImageManifest(repo, reference)
if err != nil {
registry.log.Error().Str("errorType", common.TypeOf(err)).
Expand Down Expand Up @@ -136,10 +133,14 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
}

for _, manifest := range indexManifest.Manifests {
tempImageStore.RLock(&lockLatency)
manifestBuf, err := tempImageStore.GetBlobContent(repo, manifest.Digest)
tempImageStore.RUnlock(&lockLatency)
var manifestBuf []byte

err := tempImageStore.WithRepoReadLock(repo, func() error {
var err error
manifestBuf, err = tempImageStore.GetBlobContent(repo, manifest.Digest)

return err
})
if err != nil {
registry.log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Str("dir", path.Join(tempImageStore.RootDir(), repo)).Str("digest", manifest.Digest.String()).
Expand Down
160 changes: 78 additions & 82 deletions pkg/meta/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"errors"
"time"

godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -108,66 +107,65 @@ func getReposToBeDeleted(allStorageRepos []string, allMetaDBRepos []string) []st
func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreController, log log.Logger) error {
imageStore := storeController.GetImageStore(repo)

var lockLatency time.Time
err := imageStore.WithRepoReadLock(repo, func() error {
indexBlob, err := imageStore.GetIndexContent(repo)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to read index.json for repo")

imageStore.RLock(&lockLatency)
defer imageStore.RUnlock(&lockLatency)
return err
}

indexBlob, err := imageStore.GetIndexContent(repo)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to read index.json for repo")
var indexContent ispec.Index

return err
}
err = json.Unmarshal(indexBlob, &indexContent)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to unmarshal index.json for repo")

var indexContent ispec.Index
return err
}

err = json.Unmarshal(indexBlob, &indexContent)
if err != nil {
log.Error().Err(err).Str("repository", repo).Msg("failed to unmarshal index.json for repo")
err = metaDB.ResetRepoReferences(repo)
if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
log.Error().Err(err).Str("repository", repo).Msg("failed to reset tag field in RepoMetadata for repo")

return err
}
return err
}

err = metaDB.ResetRepoReferences(repo)
if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
log.Error().Err(err).Str("repository", repo).Msg("failed to reset tag field in RepoMetadata for repo")
for _, manifest := range indexContent.Manifests {
tag := manifest.Annotations[ispec.AnnotationRefName]

return err
}
if zcommon.IsReferrersTag(tag) {
continue
}

for _, manifest := range indexContent.Manifests {
tag := manifest.Annotations[ispec.AnnotationRefName]
manifestBlob, _, _, err := imageStore.GetImageManifest(repo, manifest.Digest.String())
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("digest", manifest.Digest.String()).
Msg("failed to get blob for image")

if zcommon.IsReferrersTag(tag) {
continue
}
return err
}

manifestBlob, _, _, err := imageStore.GetImageManifest(repo, manifest.Digest.String())
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("digest", manifest.Digest.String()).
Msg("failed to get blob for image")
reference := tag

return err
}
if tag == "" {
reference = manifest.Digest.String()
}

reference := tag
err = SetImageMetaFromInput(context.Background(), repo, reference, manifest.MediaType, manifest.Digest, manifestBlob,
imageStore, metaDB, log)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("tag", tag).
Msg("failed to set metadata for image")

if tag == "" {
reference = manifest.Digest.String()
return err
}
}

err = SetImageMetaFromInput(context.Background(), repo, reference, manifest.MediaType, manifest.Digest, manifestBlob,
imageStore, metaDB, log)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("tag", tag).
Msg("failed to set metadata for image")

return err
}
}
return nil
})

return nil
return err
}

func getAllRepos(storeController stypes.StoreController, log log.Logger) ([]string, error) {
Expand Down Expand Up @@ -222,34 +220,33 @@ func getCosignSignatureLayersInfo(
return layers, err
}

var lockLatency time.Time

imageStore.RLock(&lockLatency)
defer imageStore.RUnlock(&lockLatency)
err := imageStore.WithRepoReadLock(repo, func() error {
for _, layer := range manifestContent.Layers {
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).
Msg("failed to get cosign signature layer content")

for _, layer := range manifestContent.Layers {
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).Msg(
"failed to get cosign signature layer content")
return err
}

return layers, err
}
layerSigKey, ok := layer.Annotations[zcommon.CosignSigKey]
if !ok {
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).
Msg("failed to get specific annotation of cosign signature")
}

layerSigKey, ok := layer.Annotations[zcommon.CosignSigKey]
if !ok {
log.Error().Err(err).Str("repository", repo).Str("reference", tag).Str("layerDigest", layer.Digest.String()).Msg(
"failed to get specific annotation of cosign signature")
layers = append(layers, mTypes.LayerInfo{
LayerDigest: layer.Digest.String(),
LayerContent: layerContent,
SignatureKey: layerSigKey,
})
}

layers = append(layers, mTypes.LayerInfo{
LayerDigest: layer.Digest.String(),
LayerContent: layerContent,
SignatureKey: layerSigKey,
})
}
return nil
})

return layers, nil
return layers, err
}

func getNotationSignatureLayersInfo(
Expand Down Expand Up @@ -279,28 +276,27 @@ func getNotationSignatureLayersInfo(

layer := manifestContent.Layers[0].Digest

var lockLatency time.Time
err := imageStore.WithRepoReadLock(repo, func() error {
layerContent, err := imageStore.GetBlobContent(repo, layer)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("reference", manifestDigest).Str("layerDigest", layer.String()).
Msg("failed to get notation signature blob content")

imageStore.RLock(&lockLatency)
defer imageStore.RUnlock(&lockLatency)
return err
}

layerContent, err := imageStore.GetBlobContent(repo, layer)
if err != nil {
log.Error().Err(err).Str("repository", repo).Str("reference", manifestDigest).Str("layerDigest", layer.String()).Msg(
"failed to get notation signature blob content")
layerSigKey := manifestContent.Layers[0].MediaType

return layers, err
}

layerSigKey := manifestContent.Layers[0].MediaType
layers = append(layers, mTypes.LayerInfo{
LayerDigest: layer.String(),
LayerContent: layerContent,
SignatureKey: layerSigKey,
})

layers = append(layers, mTypes.LayerInfo{
LayerDigest: layer.String(),
LayerContent: layerContent,
SignatureKey: layerSigKey,
return nil
})

return layers, nil
return layers, err
}

// SetMetadataFromInput tries to set manifest metadata and update repo metadata by adding the current tag
Expand Down
13 changes: 9 additions & 4 deletions pkg/meta/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,8 +750,7 @@ func (rc *RedisDB) SetRepoReference(ctx context.Context, repo string,
return err
}

locks := []string{rc.getImageLockKey(imageMeta.Digest.String()), rc.getRepoLockKey(repo)}
err = rc.withRSLocks(ctx, locks, func() error {
err = rc.withRSLocks(ctx, []string{rc.getImageLockKey(imageMeta.Digest.String())}, func() error {
err := rc.Client.HSet(ctx, rc.ImageMetaKey, imageMeta.Digest.String(), imageMetaBlob).Err()
if err != nil {
rc.Log.Error().Err(err).Str("hset", rc.ImageMetaKey).Str("digest", imageMeta.Digest.String()).
Expand All @@ -760,6 +759,13 @@ func (rc *RedisDB) SetRepoReference(ctx context.Context, repo string,
return fmt.Errorf("failed to set image meta record for digest %s: %w", imageMeta.Digest.String(), err)
}

return nil
})
if err != nil {
return err
}

err = rc.withRSLocks(ctx, []string{rc.getRepoLockKey(repo)}, func() error {
protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
if err != nil && !errors.Is(err, zerr.ErrRepoMetaNotFound) {
return err
Expand Down Expand Up @@ -1805,8 +1811,7 @@ if there are no tags pointing to the digest, otherwise it's noop.
func (rc *RedisDB) RemoveRepoReference(repo, reference string, manifestDigest godigest.Digest) error {
ctx := context.Background()

locks := []string{rc.getImageLockKey(manifestDigest.String()), rc.getRepoLockKey(repo)}
err := rc.withRSLocks(ctx, locks, func() error {
err := rc.withRSLocks(ctx, []string{rc.getRepoLockKey(repo)}, func() error {
protoRepoMeta, err := rc.getProtoRepoMeta(ctx, repo)
if err != nil {
if errors.Is(err, zerr.ErrRepoMetaNotFound) {
Expand Down
13 changes: 4 additions & 9 deletions pkg/storage/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -871,9 +871,6 @@ type DedupeTaskGenerator struct {
ImgStore storageTypes.ImageStore
// storage dedupe value
Dedupe bool
// store blobs paths grouped by digest
digest godigest.Digest
duplicateBlobs []string
/* store processed digest, used for iterating duplicateBlobs one by one
and generating a task for each unprocessed one*/
lastDigests []godigest.Digest
Expand Down Expand Up @@ -912,15 +909,15 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
}

// get all blobs from storage.imageStore and group them by digest
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
digest, duplicateBlobs, err := gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests)
if err != nil {
gen.Log.Error().Err(err).Str("component", "dedupe").Msg("failed to get next digest")

return nil, err
}

// if no digests left, then mark the task generator as done
if gen.digest == "" {
if digest == "" {
gen.Log.Info().Str("component", "dedupe").Msg("no digests left, finished")

gen.done = true
Expand All @@ -929,10 +926,10 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
}

// mark digest as processed before running its task
gen.lastDigests = append(gen.lastDigests, gen.digest)
gen.lastDigests = append(gen.lastDigests, digest)

// generate rebuild dedupe task for this digest
return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil
return newDedupeTask(gen.ImgStore, digest, gen.Dedupe, duplicateBlobs, gen.Log), nil
}

func (gen *DedupeTaskGenerator) IsDone() bool {
Expand All @@ -945,9 +942,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool {

func (gen *DedupeTaskGenerator) Reset() {
gen.lastDigests = []godigest.Digest{}
gen.duplicateBlobs = []string{}
gen.repos = []string{}
gen.digest = ""
gen.done = false
}

Expand Down
Loading
Loading