From 54465e93df1eb07cc644c930b2caf348f134b59e Mon Sep 17 00:00:00 2001 From: Andrei Aaron Date: Thu, 8 Aug 2024 19:06:28 +0000 Subject: [PATCH 1/4] feat(storage): enable parallel writes by using per-repo locking Signed-off-by: Andrei Aaron --- pkg/api/routes.go | 4 +- pkg/extensions/sync/destination.go | 4 +- pkg/meta/parse.go | 12 ++-- pkg/storage/gc/gc.go | 4 +- pkg/storage/imagestore/imagestore.go | 91 ++++++++++++++++++-------- pkg/storage/imagestore/lock.go | 98 ++++++++++++++++++++++++++++ pkg/storage/scrub.go | 8 +-- pkg/storage/types/types.go | 4 ++ pkg/test/mocks/image_store_mock.go | 12 ++++ 9 files changed, 192 insertions(+), 45 deletions(-) create mode 100644 pkg/storage/imagestore/lock.go diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 97774058f..2f542d1ca 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -957,8 +957,8 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re } else { var lockLatency time.Time - imgStore.RLock(&lockLatency) - defer imgStore.RUnlock(&lockLatency) + imgStore.RLockRepo(name, &lockLatency) + defer imgStore.RUnlockRepo(name, &lockLatency) ok, blen, _, err = imgStore.StatBlob(name, digest) } diff --git a/pkg/extensions/sync/destination.go b/pkg/extensions/sync/destination.go index 3384e6270..c7234bedd 100644 --- a/pkg/extensions/sync/destination.go +++ b/pkg/extensions/sync/destination.go @@ -136,9 +136,9 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer } for _, manifest := range indexManifest.Manifests { - tempImageStore.RLock(&lockLatency) + tempImageStore.RLockRepo(repo, &lockLatency) manifestBuf, err := tempImageStore.GetBlobContent(repo, manifest.Digest) - tempImageStore.RUnlock(&lockLatency) + tempImageStore.RUnlockRepo(repo, &lockLatency) if err != nil { registry.log.Error().Str("errorType", common.TypeOf(err)). diff --git a/pkg/meta/parse.go b/pkg/meta/parse.go index ab671c008..5045caab2 100644 --- a/pkg/meta/parse.go +++ b/pkg/meta/parse.go @@ -109,8 +109,8 @@ func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreCo var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) indexBlob, err := imageStore.GetIndexContent(repo) if err != nil { @@ -223,8 +223,8 @@ func getCosignSignatureLayersInfo( var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) for _, layer := range manifestContent.Layers { layerContent, err := imageStore.GetBlobContent(repo, layer.Digest) @@ -280,8 +280,8 @@ func getNotationSignatureLayersInfo( var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) layerContent, err := imageStore.GetBlobContent(repo, layer) if err != nil { diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index 4c7300348..f842ff63c 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -107,8 +107,8 @@ func (gc GarbageCollect) cleanRepo(ctx context.Context, repo string) error { return zerr.ErrRepoNotFound } - gc.imgStore.Lock(&lockLatency) - defer gc.imgStore.Unlock(&lockLatency) + gc.imgStore.LockRepo(repo, &lockLatency) + defer gc.imgStore.UnlockRepo(repo, &lockLatency) /* this index (which represents the index.json of this repo) is the root point from which we search for dangling manifests/blobs diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index 64f0f5d6d..a3aa75a03 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -10,7 +10,6 @@ import ( "path" "path/filepath" "strings" - "sync" "time" "unicode/utf8" @@ -42,7 +41,7 @@ const ( type ImageStore struct { rootDir string storeDriver storageTypes.Driver - lock *sync.RWMutex + lock *ImageStoreLock log zlog.Logger metrics monitoring.MetricServer cache cache.Cache @@ -78,7 +77,7 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo imgStore := &ImageStore{ rootDir: rootDir, storeDriver: storeDriver, - lock: &sync.RWMutex{}, + lock: NewImageStoreLock(), log: log, metrics: metrics, dedupe: dedupe, @@ -124,6 +123,40 @@ func (is *ImageStore) Unlock(lockStart *time.Time) { monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram } +// RLock read-lock for specific repo +func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) { + *lockStart = time.Now() + + is.lock.RLockRepo(repo) +} + +// RUnlock read-unlock for specific repo. +func (is *ImageStore) RUnlockRepo(repo string, lockStart *time.Time) { + is.lock.RUnlockRepo(repo) + + lockEnd := time.Now() + // includes time spent in acquiring and holding a lock + latency := lockEnd.Sub(*lockStart) + monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram +} + +// Lock write-lock for specific repo.. +func (is *ImageStore) LockRepo(repo string, lockStart *time.Time) { + *lockStart = time.Now() + + is.lock.LockRepo(repo) +} + +// Unlock write-unlock for specific repo.. +func (is *ImageStore) UnlockRepo(repo string, lockStart *time.Time) { + is.lock.UnlockRepo(repo) + + lockEnd := time.Now() + // includes time spent in acquiring and holding a lock + latency := lockEnd.Sub(*lockStart) + monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram +} + func (is *ImageStore) initRepo(name string) error { repoDir := path.Join(is.rootDir, name) @@ -200,8 +233,8 @@ func (is *ImageStore) initRepo(name string) error { func (is *ImageStore) InitRepo(name string) error { var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(name, &lockLatency) + defer is.UnlockRepo(name, &lockLatency) return is.initRepo(name) } @@ -392,8 +425,8 @@ func (is *ImageStore) GetImageTags(repo string) ([]string, error) { return nil, zerr.ErrRepoNotFound } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) index, err := common.GetIndex(is, repo, is.log) if err != nil { @@ -414,9 +447,9 @@ func (is *ImageStore) GetImageManifest(repo, reference string) ([]byte, godigest var err error - is.RLock(&lockLatency) + is.RLockRepo(repo, &lockLatency) defer func() { - is.RUnlock(&lockLatency) + is.RUnlockRepo(repo, &lockLatency) if err == nil { monitoring.IncDownloadCounter(is.metrics, repo) @@ -466,9 +499,9 @@ func (is *ImageStore) PutImageManifest(repo, reference, mediaType string, //noli var err error - is.Lock(&lockLatency) + is.LockRepo(repo, &lockLatency) defer func() { - is.Unlock(&lockLatency) + is.UnlockRepo(repo, &lockLatency) if err == nil { if is.storeDriver.Name() == storageConstants.LocalStorageDriverName { @@ -596,8 +629,8 @@ func (is *ImageStore) DeleteImageManifest(repo, reference string, detectCollisio var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) err := is.deleteImageManifest(repo, reference, detectCollisions) if err != nil { @@ -921,8 +954,8 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { err = is.DedupeBlob(src, dstDigest, repo, dst) @@ -1001,8 +1034,8 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) dst := is.BlobPath(repo, dstDigest) @@ -1204,11 +1237,11 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6 blobPath := is.BlobPath(repo, digest) if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) } else { - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) } binfo, err := is.storeDriver.Stat(blobPath) @@ -1340,8 +1373,8 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT return nil, -1, -1, err } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) binfo, err := is.originalBlobInfo(repo, digest) if err != nil { @@ -1417,8 +1450,8 @@ func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType str return nil, -1, err } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) binfo, err := is.originalBlobInfo(repo, digest) if err != nil { @@ -1494,8 +1527,8 @@ func (is *ImageStore) GetReferrers(repo string, gdigest godigest.Digest, artifac ) (ispec.Index, error) { var lockLatency time.Time - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) return common.GetReferrers(is, repo, gdigest, artifactTypes, is.log) } @@ -1568,8 +1601,8 @@ func (is *ImageStore) DeleteBlob(repo string, digest godigest.Digest) error { return err } - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) return is.deleteBlob(repo, digest) } diff --git a/pkg/storage/imagestore/lock.go b/pkg/storage/imagestore/lock.go new file mode 100644 index 000000000..16a419449 --- /dev/null +++ b/pkg/storage/imagestore/lock.go @@ -0,0 +1,98 @@ +package imagestore + +import ( + "sync" +) + +type ImageStoreLock struct { + // locks per repository paths + repoLocks sync.Map + // lock for the entire storage, needed in case all repos need to be processed + // including blocking creating new repos + globalLock *sync.RWMutex +} + +func NewImageStoreLock() *ImageStoreLock { + return &ImageStoreLock{ + repoLocks: sync.Map{}, + globalLock: &sync.RWMutex{}, + } +} + +func (sl *ImageStoreLock) RLock() { + // block reads and writes to the entire storage, including new repos + sl.globalLock.RLock() +} + +func (sl *ImageStoreLock) RUnlock() { + // unlock to the storage in general + sl.globalLock.RUnlock() +} + +func (sl *ImageStoreLock) Lock() { + // block reads and writes to the entire storage, including new repos + sl.globalLock.Lock() +} + +func (sl *ImageStoreLock) Unlock() { + // unlock to the storage in general + sl.globalLock.Unlock() +} + +func (sl *ImageStoreLock) RLockRepo(repo string) { + // besides the individual repo increment the read counter for the + // global lock, this will make sure the storage cannot be + // write-locked at global level while individual repos are accessed + sl.globalLock.RLock() + + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) + + // lock individual repo + repoLock := val.(*sync.RWMutex) + repoLock.RLock() +} + +func (sl *ImageStoreLock) RUnlockRepo(repo string) { + val, ok := sl.repoLocks.Load(repo) + if !ok { + // somehow the unlock is called for repo that was never locked + return + } + + // read-unlock individual repo + repoLock := val.(*sync.RWMutex) + repoLock.RUnlock() + + // decrement the global read counter after the one for the individual repo is decremented + sl.globalLock.RUnlock() +} + +func (sl *ImageStoreLock) LockRepo(repo string) { + // besides the individual repo increment the read counter for the + // global lock, this will make sure the storage cannot be + // write-locked at global level while individual repos are accessed + // we are not using the write lock here, as that would make all repos + // wait for one another + sl.globalLock.RLock() + + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) + + // write-lock individual repo + repoLock := val.(*sync.RWMutex) + repoLock.Lock() +} + +func (sl *ImageStoreLock) UnlockRepo(repo string) { + val, ok := sl.repoLocks.Load(repo) + if !ok { + // somehow the unlock is called for a repo that was never locked + return + } + + // write-unlock individual repo + repoLock := val.(*sync.RWMutex) + repoLock.Unlock() + + // decrement the global read counter after the individual repo was unlocked + sl.globalLock.RUnlock() +} diff --git a/pkg/storage/scrub.go b/pkg/storage/scrub.go index 859252cbe..af633a5af 100644 --- a/pkg/storage/scrub.go +++ b/pkg/storage/scrub.go @@ -134,8 +134,8 @@ func checkImage( ) ([]ispec.Descriptor, error) { var lockLatency time.Time - imgStore.RLock(&lockLatency) - defer imgStore.RUnlock(&lockLatency) + imgStore.RLockRepo(imageName, &lockLatency) + defer imgStore.RUnlockRepo(imageName, &lockLatency) manifestContent, err := imgStore.GetBlobContent(imageName, manifest.Digest) if err != nil { @@ -149,8 +149,8 @@ func checkImage( func getIndex(imageName string, imgStore storageTypes.ImageStore) ([]byte, error) { var lockLatency time.Time - imgStore.RLock(&lockLatency) - defer imgStore.RUnlock(&lockLatency) + imgStore.RLockRepo(imageName, &lockLatency) + defer imgStore.RUnlockRepo(imageName, &lockLatency) // check image structure / layout ok, err := imgStore.ValidateRepo(imageName) diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 9d5cd4882..0794d6bab 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -26,6 +26,10 @@ type ImageStore interface { //nolint:interfacebloat RUnlock(*time.Time) Lock(*time.Time) Unlock(*time.Time) + RLockRepo(repo string, lockStart *time.Time) + RUnlockRepo(repo string, lockStart *time.Time) + LockRepo(repo string, lockStart *time.Time) + UnlockRepo(repo string, lockStart *time.Time) InitRepo(name string) error ValidateRepo(name string) (bool, error) GetRepositories() ([]string, error) diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index d94b540d8..c017e2c1a 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -82,6 +82,18 @@ func (is MockedImageStore) RUnlock(t *time.Time) { func (is MockedImageStore) RLock(t *time.Time) { } +func (is MockedImageStore) LockRepo(repo string, t *time.Time) { +} + +func (is MockedImageStore) UnlockRepo(repo string, t *time.Time) { +} + +func (is MockedImageStore) RUnlockRepo(repo string, t *time.Time) { +} + +func (is MockedImageStore) RLockRepo(repo string, t *time.Time) { +} + func (is MockedImageStore) Name() string { if is.NameFn != nil { return is.NameFn() From 501c79b72dbcd6495e80731a1f56019459e6d49b Mon Sep 17 00:00:00 2001 From: Andrei Aaron Date: Sat, 17 Aug 2024 10:00:52 +0000 Subject: [PATCH 2/4] refactor: more work on locks Signed-off-by: Andrei Aaron --- pkg/storage/common/common.go | 21 +++++----- pkg/storage/common/common_test.go | 4 +- pkg/storage/imagestore/imagestore.go | 27 ++++++++----- pkg/storage/imagestore/lock.go | 8 ++-- pkg/storage/local/local_test.go | 7 +++- pkg/storage/s3/s3_test.go | 57 +++++++++++++++++----------- pkg/storage/types/types.go | 5 ++- pkg/test/mocks/image_store_mock.go | 13 ++++--- 8 files changed, 83 insertions(+), 59 deletions(-) diff --git a/pkg/storage/common/common.go b/pkg/storage/common/common.go index 450090199..e0528ba24 100644 --- a/pkg/storage/common/common.go +++ b/pkg/storage/common/common.go @@ -825,9 +825,6 @@ type DedupeTaskGenerator struct { ImgStore storageTypes.ImageStore // storage dedupe value Dedupe bool - // store blobs paths grouped by digest - digest godigest.Digest - duplicateBlobs []string /* store processed digest, used for iterating duplicateBlobs one by one and generating a task for each unprocessed one*/ lastDigests []godigest.Digest @@ -866,7 +863,7 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { } // get all blobs from storage.imageStore and group them by digest - gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests) + digest, duplicateBlobs, duplicateRepos, err := gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests) if err != nil { gen.Log.Error().Err(err).Str("component", "dedupe").Msg("failed to get next digest") @@ -874,7 +871,7 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { } // if no digests left, then mark the task generator as done - if gen.digest == "" { + if digest == "" { gen.Log.Info().Str("component", "dedupe").Msg("no digests left, finished") gen.done = true @@ -883,10 +880,10 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { } // mark digest as processed before running its task - gen.lastDigests = append(gen.lastDigests, gen.digest) + gen.lastDigests = append(gen.lastDigests, digest) // generate rebuild dedupe task for this digest - return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil + return newDedupeTask(gen.ImgStore, digest, gen.Dedupe, duplicateBlobs, duplicateRepos, gen.Log), nil } func (gen *DedupeTaskGenerator) IsDone() bool { @@ -899,9 +896,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool { func (gen *DedupeTaskGenerator) Reset() { gen.lastDigests = []godigest.Digest{} - gen.duplicateBlobs = []string{} gen.repos = []string{} - gen.digest = "" gen.done = false } @@ -911,19 +906,21 @@ type dedupeTask struct { digest godigest.Digest // blobs paths with the same digest ^ duplicateBlobs []string + duplicateRepos []string dedupe bool log zlog.Logger } func newDedupeTask(imgStore storageTypes.ImageStore, digest godigest.Digest, dedupe bool, - duplicateBlobs []string, log zlog.Logger, + duplicateBlobs, duplicateRepos []string, log zlog.Logger, ) *dedupeTask { - return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log} + return &dedupeTask{imgStore, digest, duplicateBlobs, duplicateRepos, dedupe, log} } func (dt *dedupeTask) DoWork(ctx context.Context) error { // run task - err := dt.imgStore.RunDedupeForDigest(ctx, dt.digest, dt.dedupe, dt.duplicateBlobs) //nolint: contextcheck + err := dt.imgStore.RunDedupeForDigest(ctx, dt.digest, dt.dedupe, dt.duplicateBlobs, //nolint: contextcheck + dt.duplicateRepos) if err != nil { // log it dt.log.Error().Err(err).Str("digest", dt.digest.String()).Str("component", "dedupe"). diff --git a/pkg/storage/common/common_test.go b/pkg/storage/common/common_test.go index c618d7627..c7091b2e0 100644 --- a/pkg/storage/common/common_test.go +++ b/pkg/storage/common/common_test.go @@ -489,9 +489,9 @@ func TestDedupeGeneratorErrors(t *testing.T) { return []string{"repo1", "repo2"}, nil }, GetNextDigestWithBlobPathsFn: func(repos []string, lastDigests []godigest.Digest) ( - godigest.Digest, []string, error, + godigest.Digest, []string, []string, error, ) { - return "sha256:123", []string{}, ErrTestError + return "sha256:123", []string{}, []string{}, ErrTestError }, } diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index a3aa75a03..e7ef1d50b 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -123,7 +123,7 @@ func (is *ImageStore) Unlock(lockStart *time.Time) { monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram } -// RLock read-lock for specific repo +// RLock read-lock for specific repo. func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) { *lockStart = time.Now() @@ -140,14 +140,14 @@ func (is *ImageStore) RUnlockRepo(repo string, lockStart *time.Time) { monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram } -// Lock write-lock for specific repo.. +// Lock write-lock for specific repo. func (is *ImageStore) LockRepo(repo string, lockStart *time.Time) { *lockStart = time.Now() is.lock.LockRepo(repo) } -// Unlock write-unlock for specific repo.. +// Unlock write-unlock for specific repo. func (is *ImageStore) UnlockRepo(repo string, lockStart *time.Time) { is.lock.UnlockRepo(repo) @@ -448,6 +448,7 @@ func (is *ImageStore) GetImageManifest(repo, reference string) ([]byte, godigest var err error is.RLockRepo(repo, &lockLatency) + defer func() { is.RUnlockRepo(repo, &lockLatency) @@ -500,6 +501,7 @@ func (is *ImageStore) PutImageManifest(repo, reference, mediaType string, //noli var err error is.LockRepo(repo, &lockLatency) + defer func() { is.UnlockRepo(repo, &lockLatency) @@ -1798,7 +1800,7 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]godigest.Digest, error) { } func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest, -) (godigest.Digest, []string, error) { +) (godigest.Digest, []string, []string, error) { var lockLatency time.Time dir := is.rootDir @@ -1806,10 +1808,12 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g is.RLock(&lockLatency) defer is.RUnlock(&lockLatency) - var duplicateBlobs []string + var duplicateBlobs, duplicateRepos []string var digest godigest.Digest + var repo string + err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error { // skip blobs under .sync and .uploads if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) || @@ -1819,7 +1823,7 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g if fileInfo.IsDir() { // skip repositories not found in repos - repo := path.Base(fileInfo.Path()) + repo = path.Base(fileInfo.Path()) if !zcommon.Contains(repos, repo) && repo != ispec.ImageBlobsDir { candidateAlgorithm := godigest.Algorithm(repo) @@ -1829,6 +1833,7 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g } } + repo = path.Dir(path.Dir(fileInfo.Path())) digestHash := path.Base(fileInfo.Path()) digestAlgorithm := godigest.Algorithm(path.Base(path.Dir(fileInfo.Path()))) @@ -1847,6 +1852,10 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g if blobDigest == digest { duplicateBlobs = append(duplicateBlobs, fileInfo.Path()) + + if !zcommon.Contains(duplicateRepos, repo) { + duplicateRepos = append(duplicateRepos, repo) + } } return nil @@ -1856,10 +1865,10 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g var perr driver.PathNotFoundError if errors.As(err, &perr) { - return digest, duplicateBlobs, nil + return digest, duplicateBlobs, duplicateRepos, nil } - return digest, duplicateBlobs, err + return digest, duplicateBlobs, duplicateRepos, err } func (is *ImageStore) getOriginalBlobFromDisk(duplicateBlobs []string) (string, error) { @@ -2034,7 +2043,7 @@ func (is *ImageStore) restoreDedupedBlobs(ctx context.Context, digest godigest.D } func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, - duplicateBlobs []string, + duplicateBlobs []string, duplicateRepos []string, ) error { var lockLatency time.Time diff --git a/pkg/storage/imagestore/lock.go b/pkg/storage/imagestore/lock.go index 16a419449..578c04f2e 100644 --- a/pkg/storage/imagestore/lock.go +++ b/pkg/storage/imagestore/lock.go @@ -48,7 +48,7 @@ func (sl *ImageStoreLock) RLockRepo(repo string) { val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) // lock individual repo - repoLock := val.(*sync.RWMutex) + repoLock, _ := val.(*sync.RWMutex) repoLock.RLock() } @@ -60,7 +60,7 @@ func (sl *ImageStoreLock) RUnlockRepo(repo string) { } // read-unlock individual repo - repoLock := val.(*sync.RWMutex) + repoLock, _ := val.(*sync.RWMutex) repoLock.RUnlock() // decrement the global read counter after the one for the individual repo is decremented @@ -78,7 +78,7 @@ func (sl *ImageStoreLock) LockRepo(repo string) { val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) // write-lock individual repo - repoLock := val.(*sync.RWMutex) + repoLock, _ := val.(*sync.RWMutex) repoLock.Lock() } @@ -90,7 +90,7 @@ func (sl *ImageStoreLock) UnlockRepo(repo string) { } // write-unlock individual repo - repoLock := val.(*sync.RWMutex) + repoLock, _ := val.(*sync.RWMutex) repoLock.Unlock() // decrement the global read counter after the individual repo was unlocked diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index ce603fef9..c974f9156 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -1280,11 +1280,16 @@ func TestDedupeLinks(t *testing.T) { path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest2), } + duplicateRepos := []string{ + path.Join(dir, "dedupe1"), + } + // remove original blob so that it can not be statted err := os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), godigest.Digest(blobDigest1), true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), godigest.Digest(blobDigest1), true, + duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 85d0fbeb2..f675ccf8e 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -2252,10 +2252,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2301,10 +2302,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2350,10 +2352,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2396,10 +2399,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) Convey("Trigger Stat() error in dedupeBlobs()", func() { @@ -2441,10 +2445,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) }) @@ -2492,10 +2497,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2540,10 +2546,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2588,10 +2595,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2603,7 +2611,7 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - _, _, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + _, _, _, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) So(err, ShouldNotBeNil) }) @@ -2695,10 +2703,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2717,10 +2726,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2735,10 +2745,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) }) diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 0794d6bab..8596cd282 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -64,8 +64,9 @@ type ImageStore interface { //nolint:interfacebloat GetBlobContent(repo string, digest godigest.Digest) ([]byte, error) GetReferrers(repo string, digest godigest.Digest, artifactTypes []string) (ispec.Index, error) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) - RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, duplicateBlobs []string) error - GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error) + RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, + duplicateBlobs, duplicateRepos []string) error + GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, []string, error) GetAllBlobs(repo string) ([]godigest.Digest, error) PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler) VerifyBlobDigestValue(repo string, digest godigest.Digest) error diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index c017e2c1a..1884553dd 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -51,8 +51,9 @@ type MockedImageStore struct { RunGCPeriodicallyFn func(interval time.Duration, sch *scheduler.Scheduler) RunDedupeBlobsFn func(interval time.Duration, sch *scheduler.Scheduler) RunDedupeForDigestFn func(ctx context.Context, digest godigest.Digest, dedupe bool, - duplicateBlobs []string) error - GetNextDigestWithBlobPathsFn func(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error) + duplicateBlobs []string, duplicateRepos []string) error + GetNextDigestWithBlobPathsFn func(repos []string, lastDigests []godigest.Digest, + ) (godigest.Digest, []string, []string, error) GetAllBlobsFn func(repo string) ([]godigest.Digest, error) CleanupRepoFn func(repo string, blobs []godigest.Digest, removeRepo bool) (int, error) PutIndexContentFn func(repo string, index ispec.Index) error @@ -403,22 +404,22 @@ func (is MockedImageStore) RunDedupeBlobs(interval time.Duration, sch *scheduler } func (is MockedImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, - duplicateBlobs []string, + duplicateBlobs []string, duplicateRepos []string, ) error { if is.RunDedupeForDigestFn != nil { - return is.RunDedupeForDigestFn(ctx, digest, dedupe, duplicateBlobs) + return is.RunDedupeForDigestFn(ctx, digest, dedupe, duplicateBlobs, duplicateRepos) } return nil } func (is MockedImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest, -) (godigest.Digest, []string, error) { +) (godigest.Digest, []string, []string, error) { if is.GetNextDigestWithBlobPathsFn != nil { return is.GetNextDigestWithBlobPathsFn(repos, lastDigests) } - return "", []string{}, nil + return "", []string{}, []string{}, nil } func (is MockedImageStore) CleanupRepo(repo string, blobs []godigest.Digest, removeRepo bool) (int, error) { From 2ae3f0e26f26dc0a3fa781d90e91206f10584222 Mon Sep 17 00:00:00 2001 From: Andrei Aaron Date: Sat, 17 Aug 2024 13:53:54 +0000 Subject: [PATCH 3/4] refactor: remove global locking feature from the imagestore Signed-off-by: Andrei Aaron --- pkg/storage/imagestore/imagestore.go | 115 ++++++++++++--------------- pkg/storage/imagestore/lock.go | 105 ++++++++++++------------ pkg/storage/local/driver.go | 8 +- pkg/storage/storage_test.go | 28 ------- pkg/storage/types/types.go | 4 - pkg/test/mocks/image_store_mock.go | 12 --- pkg/test/oci-utils/oci_layout.go | 16 ++-- 7 files changed, 117 insertions(+), 171 deletions(-) diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index e7ef1d50b..fe0173e9c 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -89,40 +89,6 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo return imgStore } -// RLock read-lock. -func (is *ImageStore) RLock(lockStart *time.Time) { - *lockStart = time.Now() - - is.lock.RLock() -} - -// RUnlock read-unlock. -func (is *ImageStore) RUnlock(lockStart *time.Time) { - is.lock.RUnlock() - - lockEnd := time.Now() - // includes time spent in acquiring and holding a lock - latency := lockEnd.Sub(*lockStart) - monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram -} - -// Lock write-lock. -func (is *ImageStore) Lock(lockStart *time.Time) { - *lockStart = time.Now() - - is.lock.Lock() -} - -// Unlock write-unlock. -func (is *ImageStore) Unlock(lockStart *time.Time) { - is.lock.Unlock() - - lockEnd := time.Now() - // includes time spent in acquiring and holding a lock - latency := lockEnd.Sub(*lockStart) - monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram -} - // RLock read-lock for specific repo. func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) { *lockStart = time.Now() @@ -296,13 +262,12 @@ func (is *ImageStore) ValidateRepo(name string) (bool, error) { // GetRepositories returns a list of all the repositories under this store. func (is *ImageStore) GetRepositories() ([]string, error) { - var lockLatency time.Time - + // Ideally this function would lock while walking in order to avoid concurrency issues + // but we can't lock everything as we don't have a valid list of all repositories + // let's assume the result of this function is a best effort and some repos may be + // added or removed by the time it returns dir := is.rootDir - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) - stores := make([]string, 0) err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error { @@ -326,7 +291,9 @@ func (is *ImageStore) GetRepositories() ([]string, error) { return nil //nolint:nilerr // ignore invalid repos } - stores = append(stores, rel) + if !zcommon.Contains(stores, rel) { + stores = append(stores, rel) + } return nil }) @@ -342,13 +309,12 @@ func (is *ImageStore) GetRepositories() ([]string, error) { // GetNextRepository returns next repository under this store. func (is *ImageStore) GetNextRepository(repo string) (string, error) { - var lockLatency time.Time - + // Ideally this function would lock while walking in order to avoid concurrency issues + // but we can't lock everything as we don't have a valid list of all repositories + // let's assume the result of this function is a best effort and some repos may be + // added or removed by the time it returns dir := is.rootDir - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) - _, err := is.storeDriver.List(dir) if err != nil { if errors.As(err, &driver.PathNotFoundError{}) { @@ -1190,8 +1156,6 @@ func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string { } func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]string, error) { - var lockLatency time.Time - if err := digest.Validate(); err != nil { return nil, err } @@ -1200,9 +1164,6 @@ func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]str return nil, nil //nolint:nilnil } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) - blobsPaths, err := is.cache.GetAllBlobs(digest) if err != nil { return nil, err @@ -1799,21 +1760,21 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]godigest.Digest, error) { return ret, nil } -func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest, +func (is *ImageStore) GetNextDigestWithBlobPaths(allRepos []string, lastDigests []godigest.Digest, ) (godigest.Digest, []string, []string, error) { var lockLatency time.Time dir := is.rootDir - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + for _, repo := range allRepos { + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) + } var duplicateBlobs, duplicateRepos []string var digest godigest.Digest - var repo string - err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error { // skip blobs under .sync and .uploads if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) || @@ -1821,19 +1782,40 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g return driver.ErrSkipDir } + if strings.HasSuffix(fileInfo.Path(), ispec.ImageLayoutFile) || + strings.HasSuffix(fileInfo.Path(), ispec.ImageIndexFile) || + strings.HasSuffix(fileInfo.Path(), ".db") { + return nil + } + + // the path is always under root dir because the walk function walks the root dir + rel, _ := filepath.Rel(is.rootDir, fileInfo.Path()) + if fileInfo.IsDir() { - // skip repositories not found in repos - repo = path.Base(fileInfo.Path()) - if !zcommon.Contains(repos, repo) && repo != ispec.ImageBlobsDir { - candidateAlgorithm := godigest.Algorithm(repo) + if fileInfo.Path() == is.rootDir || zcommon.Contains(allRepos, rel) { + // this is the root directory or a repo, go deeped into subfolders + return nil + } - if !candidateAlgorithm.Available() { - return driver.ErrSkipDir - } + // attempt to determine is the base folder + lastFolderInPath := path.Base(rel) + if lastFolderInPath == ispec.ImageBlobsDir { + // this is the blobs dir, go deeper into subfolders + return nil + } + + // this is not the root dir, a repo, or a blobs dir + // it is also unclear if we are under a repo, as this could be .trivy + // skip entire directory if the base name does not match a valid hash algorithm + candidateAlgorithm := godigest.Algorithm(lastFolderInPath) + if !candidateAlgorithm.Available() { + return driver.ErrSkipDir + } else { + // this is the folder sha256 or similar + return nil } } - repo = path.Dir(path.Dir(fileInfo.Path())) digestHash := path.Base(fileInfo.Path()) digestAlgorithm := godigest.Algorithm(path.Base(path.Dir(fileInfo.Path()))) @@ -1853,6 +1835,7 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g if blobDigest == digest { duplicateBlobs = append(duplicateBlobs, fileInfo.Path()) + repo := path.Dir(path.Dir(path.Dir(rel))) if !zcommon.Contains(duplicateRepos, repo) { duplicateRepos = append(duplicateRepos, repo) } @@ -2047,8 +2030,10 @@ func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Di ) error { var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + for _, repo := range duplicateRepos { + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) + } if dedupe { return is.dedupeBlobs(ctx, digest, duplicateBlobs) diff --git a/pkg/storage/imagestore/lock.go b/pkg/storage/imagestore/lock.go index 578c04f2e..43546952e 100644 --- a/pkg/storage/imagestore/lock.go +++ b/pkg/storage/imagestore/lock.go @@ -6,93 +6,92 @@ import ( type ImageStoreLock struct { // locks per repository paths - repoLocks sync.Map - // lock for the entire storage, needed in case all repos need to be processed - // including blocking creating new repos - globalLock *sync.RWMutex + repoLocks map[string]*sync.RWMutex + // lock for managing the content of the repo lock map + internalLock *sync.Mutex } func NewImageStoreLock() *ImageStoreLock { return &ImageStoreLock{ - repoLocks: sync.Map{}, - globalLock: &sync.RWMutex{}, + repoLocks: map[string]*sync.RWMutex{}, + internalLock: &sync.Mutex{}, } } -func (sl *ImageStoreLock) RLock() { - // block reads and writes to the entire storage, including new repos - sl.globalLock.RLock() -} - -func (sl *ImageStoreLock) RUnlock() { - // unlock to the storage in general - sl.globalLock.RUnlock() -} - -func (sl *ImageStoreLock) Lock() { - // block reads and writes to the entire storage, including new repos - sl.globalLock.Lock() -} - -func (sl *ImageStoreLock) Unlock() { - // unlock to the storage in general - sl.globalLock.Unlock() -} - func (sl *ImageStoreLock) RLockRepo(repo string) { - // besides the individual repo increment the read counter for the - // global lock, this will make sure the storage cannot be - // write-locked at global level while individual repos are accessed - sl.globalLock.RLock() - - val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) + repoLock, _ := sl.loadLock(repo) // lock individual repo - repoLock, _ := val.(*sync.RWMutex) repoLock.RLock() } func (sl *ImageStoreLock) RUnlockRepo(repo string) { - val, ok := sl.repoLocks.Load(repo) + repoLock, ok := sl.loadLock(repo) if !ok { - // somehow the unlock is called for repo that was never locked + // somehow the unlock is called for a repo that was not locked return } // read-unlock individual repo - repoLock, _ := val.(*sync.RWMutex) repoLock.RUnlock() - - // decrement the global read counter after the one for the individual repo is decremented - sl.globalLock.RUnlock() } func (sl *ImageStoreLock) LockRepo(repo string) { - // besides the individual repo increment the read counter for the - // global lock, this will make sure the storage cannot be - // write-locked at global level while individual repos are accessed - // we are not using the write lock here, as that would make all repos - // wait for one another - sl.globalLock.RLock() - - val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) + repoLock, _ := sl.loadLock(repo) // write-lock individual repo - repoLock, _ := val.(*sync.RWMutex) repoLock.Lock() } func (sl *ImageStoreLock) UnlockRepo(repo string) { - val, ok := sl.repoLocks.Load(repo) + repoLock, ok := sl.loadLock(repo) if !ok { - // somehow the unlock is called for a repo that was never locked + // somehow the unlock is called for a repo that was not locked return } // write-unlock individual repo - repoLock, _ := val.(*sync.RWMutex) repoLock.Unlock() - // decrement the global read counter after the individual repo was unlocked - sl.globalLock.RUnlock() + // attempt to clean up the map of unused locks + sl.discardLockIfPossible(repo) +} + +func (sl *ImageStoreLock) loadLock(repo string) (*sync.RWMutex, bool) { + sl.internalLock.Lock() + defer sl.internalLock.Unlock() + + repoLock, ok := sl.repoLocks[repo] + if !ok { + sl.repoLocks[repo] = &sync.RWMutex{} + repoLock = sl.repoLocks[repo] + } + + return repoLock, ok +} + +func (sl *ImageStoreLock) discardLockIfPossible(repo string) { + sl.internalLock.Lock() + defer sl.internalLock.Unlock() + + repoLock, ok := sl.repoLocks[repo] + if !ok { + // the lock is not set, no need to do anything else + return + } + + // check if the lock is in use + // this is a non-blocking operation if someone else is already blocking the lock + // the internalLock prevents the case where someone else attempts + // to load/block the lock after this function started executing + ok = repoLock.TryLock() + if !ok { + // if someone else is using this lock, it is still needed, keep it as is + return + } + // final unlock + defer repoLock.Unlock() + + // nobody else is using this lock, remove it from the map + delete(sl.repoLocks, repo) } diff --git a/pkg/storage/local/driver.go b/pkg/storage/local/driver.go index 177e5d021..29c6246d1 100644 --- a/pkg/storage/local/driver.go +++ b/pkg/storage/local/driver.go @@ -189,7 +189,13 @@ func (driver *Driver) WriteFile(filepath string, content []byte) (int, error) { func (driver *Driver) Walk(path string, walkFn storagedriver.WalkFn) error { children, err := driver.List(path) if err != nil { - return err + switch errors.As(err, &storagedriver.PathNotFoundError{}) { + case true: + // repository was removed in between listing and enumeration. Ignore it. + return nil + default: + return err + } } sort.Stable(sort.StringSlice(children)) diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index a94591339..716e12a80 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -12,7 +12,6 @@ import ( "path" "slices" "strings" - "sync" "testing" "time" @@ -889,33 +888,6 @@ func TestStorageAPIs(t *testing.T) { _, _, err = imgStore.PutImageManifest("replace", "1.0", ispec.MediaTypeImageManifest, manifestBuf) So(err, ShouldBeNil) }) - - Convey("Locks", func() { - // in parallel, a mix of read and write locks - mainly for coverage - var wg sync.WaitGroup - for i := 0; i < 1000; i++ { - wg.Add(2) - - go func() { - var lockLatency time.Time - - defer wg.Done() - imgStore.Lock(&lockLatency) - func() {}() - imgStore.Unlock(&lockLatency) - }() - go func() { - var lockLatency time.Time - - defer wg.Done() - imgStore.RLock(&lockLatency) - func() {}() - imgStore.RUnlock(&lockLatency) - }() - } - - wg.Wait() - }) }) }) } diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 8596cd282..d9ff72259 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -22,10 +22,6 @@ type ImageStore interface { //nolint:interfacebloat Name() string DirExists(d string) bool RootDir() string - RLock(*time.Time) - RUnlock(*time.Time) - Lock(*time.Time) - Unlock(*time.Time) RLockRepo(repo string, lockStart *time.Time) RUnlockRepo(repo string, lockStart *time.Time) LockRepo(repo string, lockStart *time.Time) diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index 1884553dd..b98a299ba 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -71,18 +71,6 @@ func (is MockedImageStore) StatIndex(repo string) (bool, int64, time.Time, error return true, 0, time.Time{}, nil } -func (is MockedImageStore) Lock(t *time.Time) { -} - -func (is MockedImageStore) Unlock(t *time.Time) { -} - -func (is MockedImageStore) RUnlock(t *time.Time) { -} - -func (is MockedImageStore) RLock(t *time.Time) { -} - func (is MockedImageStore) LockRepo(repo string, t *time.Time) { } diff --git a/pkg/test/oci-utils/oci_layout.go b/pkg/test/oci-utils/oci_layout.go index f2a1cc54a..9aa81fdff 100644 --- a/pkg/test/oci-utils/oci_layout.go +++ b/pkg/test/oci-utils/oci_layout.go @@ -102,8 +102,8 @@ func (olu BaseOciLayoutUtils) GetImageManifests(repo string) ([]ispec.Descriptor imageStore := olu.StoreController.GetImageStore(repo) - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) buf, err := imageStore.GetIndexContent(repo) if err != nil { @@ -137,8 +137,8 @@ func (olu BaseOciLayoutUtils) GetImageBlobManifest(repo string, digest godigest. imageStore := olu.StoreController.GetImageStore(repo) - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) blobBuf, err := imageStore.GetBlobContent(repo, digest) if err != nil { @@ -163,8 +163,8 @@ func (olu BaseOciLayoutUtils) GetImageInfo(repo string, configDigest godigest.Di imageStore := olu.StoreController.GetImageStore(repo) - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) blobBuf, err := imageStore.GetBlobContent(repo, configDigest) if err != nil { @@ -323,8 +323,8 @@ func (olu BaseOciLayoutUtils) GetImageManifestSize(repo string, manifestDigest g var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) manifestBlob, err := imageStore.GetBlobContent(repo, manifestDigest) if err != nil { From a775dcabd8b9aa321bafa0c4929500d3b12f3809 Mon Sep 17 00:00:00 2001 From: Andrei Aaron Date: Sat, 17 Aug 2024 19:43:29 +0000 Subject: [PATCH 4/4] refactor: switch back to using a syncmap for locking Remove the logic to discard unused locks as it produced deadlocks Signed-off-by: Andrei Aaron --- pkg/storage/imagestore/lock.go | 63 +++++-------------------- pkg/storage/imagestore/lock_test.go | 71 +++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 52 deletions(-) create mode 100644 pkg/storage/imagestore/lock_test.go diff --git a/pkg/storage/imagestore/lock.go b/pkg/storage/imagestore/lock.go index 43546952e..0ead2d58d 100644 --- a/pkg/storage/imagestore/lock.go +++ b/pkg/storage/imagestore/lock.go @@ -6,92 +6,51 @@ import ( type ImageStoreLock struct { // locks per repository paths - repoLocks map[string]*sync.RWMutex - // lock for managing the content of the repo lock map - internalLock *sync.Mutex + repoLocks sync.Map } func NewImageStoreLock() *ImageStoreLock { return &ImageStoreLock{ - repoLocks: map[string]*sync.RWMutex{}, - internalLock: &sync.Mutex{}, + repoLocks: sync.Map{}, } } func (sl *ImageStoreLock) RLockRepo(repo string) { - repoLock, _ := sl.loadLock(repo) + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) // lock individual repo + repoLock, _ := val.(*sync.RWMutex) repoLock.RLock() } func (sl *ImageStoreLock) RUnlockRepo(repo string) { - repoLock, ok := sl.loadLock(repo) + val, ok := sl.repoLocks.Load(repo) if !ok { - // somehow the unlock is called for a repo that was not locked + // somehow the unlock is called for repo that was not locked return } // read-unlock individual repo + repoLock, _ := val.(*sync.RWMutex) repoLock.RUnlock() } func (sl *ImageStoreLock) LockRepo(repo string) { - repoLock, _ := sl.loadLock(repo) + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) // write-lock individual repo + repoLock, _ := val.(*sync.RWMutex) repoLock.Lock() } func (sl *ImageStoreLock) UnlockRepo(repo string) { - repoLock, ok := sl.loadLock(repo) + val, ok := sl.repoLocks.Load(repo) if !ok { // somehow the unlock is called for a repo that was not locked return } // write-unlock individual repo + repoLock, _ := val.(*sync.RWMutex) repoLock.Unlock() - - // attempt to clean up the map of unused locks - sl.discardLockIfPossible(repo) -} - -func (sl *ImageStoreLock) loadLock(repo string) (*sync.RWMutex, bool) { - sl.internalLock.Lock() - defer sl.internalLock.Unlock() - - repoLock, ok := sl.repoLocks[repo] - if !ok { - sl.repoLocks[repo] = &sync.RWMutex{} - repoLock = sl.repoLocks[repo] - } - - return repoLock, ok -} - -func (sl *ImageStoreLock) discardLockIfPossible(repo string) { - sl.internalLock.Lock() - defer sl.internalLock.Unlock() - - repoLock, ok := sl.repoLocks[repo] - if !ok { - // the lock is not set, no need to do anything else - return - } - - // check if the lock is in use - // this is a non-blocking operation if someone else is already blocking the lock - // the internalLock prevents the case where someone else attempts - // to load/block the lock after this function started executing - ok = repoLock.TryLock() - if !ok { - // if someone else is using this lock, it is still needed, keep it as is - return - } - // final unlock - defer repoLock.Unlock() - - // nobody else is using this lock, remove it from the map - delete(sl.repoLocks, repo) } diff --git a/pkg/storage/imagestore/lock_test.go b/pkg/storage/imagestore/lock_test.go new file mode 100644 index 000000000..939371a9a --- /dev/null +++ b/pkg/storage/imagestore/lock_test.go @@ -0,0 +1,71 @@ +package imagestore_test + +import ( + _ "crypto/sha256" + "os" + "strconv" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + . "github.com/smartystreets/goconvey/convey" + + "zotregistry.dev/zot/pkg/extensions/monitoring" + zlog "zotregistry.dev/zot/pkg/log" + "zotregistry.dev/zot/pkg/storage" + "zotregistry.dev/zot/pkg/storage/cache" + "zotregistry.dev/zot/pkg/storage/local" +) + +func TestStorageLocks(t *testing.T) { + dir := t.TempDir() + + log := zlog.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{ + RootDir: dir, + Name: "cache", + UseRelPaths: true, + }, log) + + imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) + + Convey("Locks", t, func() { + // in parallel, a mix of read and write locks - mainly for coverage + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + repo := "repo" + strconv.Itoa(i%10) + + wg.Add(2) + + go func() { + var lockLatency time.Time + + defer wg.Done() + + t.Logf("Repo %s will be write-locked in loop %d", repo, i) + imgStore.LockRepo(repo, &lockLatency) + func() { + t.Logf("Execute while repo %s is write-locked in loop %d", repo, i) + }() + imgStore.UnlockRepo(repo, &lockLatency) + t.Logf("Repo %s is write-unlocked in loop %d", repo, i) + }() + go func() { + var lockLatency time.Time + + defer wg.Done() + t.Logf("Repo %s will be read-locked in loop %d", repo, i) + imgStore.RLockRepo(repo, &lockLatency) + func() { + t.Logf("Execute while repo %s is read-locked in loop %d", repo, i) + }() + imgStore.RUnlockRepo(repo, &lockLatency) + t.Logf("Repo %s is read-unlocked in loop %d", repo, i) + }() + } + + wg.Wait() + }) +}