diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 97774058f..2f542d1ca 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -957,8 +957,8 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re } else { var lockLatency time.Time - imgStore.RLock(&lockLatency) - defer imgStore.RUnlock(&lockLatency) + imgStore.RLockRepo(name, &lockLatency) + defer imgStore.RUnlockRepo(name, &lockLatency) ok, blen, _, err = imgStore.StatBlob(name, digest) } diff --git a/pkg/extensions/sync/destination.go b/pkg/extensions/sync/destination.go index 3384e6270..c7234bedd 100644 --- a/pkg/extensions/sync/destination.go +++ b/pkg/extensions/sync/destination.go @@ -136,9 +136,9 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer } for _, manifest := range indexManifest.Manifests { - tempImageStore.RLock(&lockLatency) + tempImageStore.RLockRepo(repo, &lockLatency) manifestBuf, err := tempImageStore.GetBlobContent(repo, manifest.Digest) - tempImageStore.RUnlock(&lockLatency) + tempImageStore.RUnlockRepo(repo, &lockLatency) if err != nil { registry.log.Error().Str("errorType", common.TypeOf(err)). diff --git a/pkg/meta/parse.go b/pkg/meta/parse.go index ab671c008..5045caab2 100644 --- a/pkg/meta/parse.go +++ b/pkg/meta/parse.go @@ -109,8 +109,8 @@ func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreCo var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) indexBlob, err := imageStore.GetIndexContent(repo) if err != nil { @@ -223,8 +223,8 @@ func getCosignSignatureLayersInfo( var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) for _, layer := range manifestContent.Layers { layerContent, err := imageStore.GetBlobContent(repo, layer.Digest) @@ -280,8 +280,8 @@ func getNotationSignatureLayersInfo( var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) layerContent, err := imageStore.GetBlobContent(repo, layer) if err != nil { diff --git a/pkg/storage/common/common.go b/pkg/storage/common/common.go index 450090199..e0528ba24 100644 --- a/pkg/storage/common/common.go +++ b/pkg/storage/common/common.go @@ -825,9 +825,6 @@ type DedupeTaskGenerator struct { ImgStore storageTypes.ImageStore // storage dedupe value Dedupe bool - // store blobs paths grouped by digest - digest godigest.Digest - duplicateBlobs []string /* store processed digest, used for iterating duplicateBlobs one by one and generating a task for each unprocessed one*/ lastDigests []godigest.Digest @@ -866,7 +863,7 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { } // get all blobs from storage.imageStore and group them by digest - gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests) + digest, duplicateBlobs, duplicateRepos, err := gen.ImgStore.GetNextDigestWithBlobPaths(gen.repos, gen.lastDigests) if err != nil { gen.Log.Error().Err(err).Str("component", "dedupe").Msg("failed to get next digest") @@ -874,7 +871,7 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { } // if no digests left, then mark the task generator as done - if gen.digest == "" { + if digest == "" { gen.Log.Info().Str("component", "dedupe").Msg("no digests left, finished") gen.done = true @@ -883,10 +880,10 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { } // mark digest as processed before running its task - gen.lastDigests = append(gen.lastDigests, gen.digest) + gen.lastDigests = append(gen.lastDigests, digest) // generate rebuild dedupe task for this digest - return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil + return newDedupeTask(gen.ImgStore, digest, gen.Dedupe, duplicateBlobs, duplicateRepos, gen.Log), nil } func (gen *DedupeTaskGenerator) IsDone() bool { @@ -899,9 +896,7 @@ func (gen *DedupeTaskGenerator) IsReady() bool { func (gen *DedupeTaskGenerator) Reset() { gen.lastDigests = []godigest.Digest{} - gen.duplicateBlobs = []string{} gen.repos = []string{} - gen.digest = "" gen.done = false } @@ -911,19 +906,21 @@ type dedupeTask struct { digest godigest.Digest // blobs paths with the same digest ^ duplicateBlobs []string + duplicateRepos []string dedupe bool log zlog.Logger } func newDedupeTask(imgStore storageTypes.ImageStore, digest godigest.Digest, dedupe bool, - duplicateBlobs []string, log zlog.Logger, + duplicateBlobs, duplicateRepos []string, log zlog.Logger, ) *dedupeTask { - return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log} + return &dedupeTask{imgStore, digest, duplicateBlobs, duplicateRepos, dedupe, log} } func (dt *dedupeTask) DoWork(ctx context.Context) error { // run task - err := dt.imgStore.RunDedupeForDigest(ctx, dt.digest, dt.dedupe, dt.duplicateBlobs) //nolint: contextcheck + err := dt.imgStore.RunDedupeForDigest(ctx, dt.digest, dt.dedupe, dt.duplicateBlobs, //nolint: contextcheck + dt.duplicateRepos) if err != nil { // log it dt.log.Error().Err(err).Str("digest", dt.digest.String()).Str("component", "dedupe"). diff --git a/pkg/storage/common/common_test.go b/pkg/storage/common/common_test.go index c618d7627..c7091b2e0 100644 --- a/pkg/storage/common/common_test.go +++ b/pkg/storage/common/common_test.go @@ -489,9 +489,9 @@ func TestDedupeGeneratorErrors(t *testing.T) { return []string{"repo1", "repo2"}, nil }, GetNextDigestWithBlobPathsFn: func(repos []string, lastDigests []godigest.Digest) ( - godigest.Digest, []string, error, + godigest.Digest, []string, []string, error, ) { - return "sha256:123", []string{}, ErrTestError + return "sha256:123", []string{}, []string{}, ErrTestError }, } diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index 4c7300348..f842ff63c 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -107,8 +107,8 @@ func (gc GarbageCollect) cleanRepo(ctx context.Context, repo string) error { return zerr.ErrRepoNotFound } - gc.imgStore.Lock(&lockLatency) - defer gc.imgStore.Unlock(&lockLatency) + gc.imgStore.LockRepo(repo, &lockLatency) + defer gc.imgStore.UnlockRepo(repo, &lockLatency) /* this index (which represents the index.json of this repo) is the root point from which we search for dangling manifests/blobs diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index 64f0f5d6d..fe0173e9c 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -10,7 +10,6 @@ import ( "path" "path/filepath" "strings" - "sync" "time" "unicode/utf8" @@ -42,7 +41,7 @@ const ( type ImageStore struct { rootDir string storeDriver storageTypes.Driver - lock *sync.RWMutex + lock *ImageStoreLock log zlog.Logger metrics monitoring.MetricServer cache cache.Cache @@ -78,7 +77,7 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo imgStore := &ImageStore{ rootDir: rootDir, storeDriver: storeDriver, - lock: &sync.RWMutex{}, + lock: NewImageStoreLock(), log: log, metrics: metrics, dedupe: dedupe, @@ -90,16 +89,16 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo return imgStore } -// RLock read-lock. -func (is *ImageStore) RLock(lockStart *time.Time) { +// RLock read-lock for specific repo. +func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) { *lockStart = time.Now() - is.lock.RLock() + is.lock.RLockRepo(repo) } -// RUnlock read-unlock. -func (is *ImageStore) RUnlock(lockStart *time.Time) { - is.lock.RUnlock() +// RUnlock read-unlock for specific repo. +func (is *ImageStore) RUnlockRepo(repo string, lockStart *time.Time) { + is.lock.RUnlockRepo(repo) lockEnd := time.Now() // includes time spent in acquiring and holding a lock @@ -107,16 +106,16 @@ func (is *ImageStore) RUnlock(lockStart *time.Time) { monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram } -// Lock write-lock. -func (is *ImageStore) Lock(lockStart *time.Time) { +// Lock write-lock for specific repo. +func (is *ImageStore) LockRepo(repo string, lockStart *time.Time) { *lockStart = time.Now() - is.lock.Lock() + is.lock.LockRepo(repo) } -// Unlock write-unlock. -func (is *ImageStore) Unlock(lockStart *time.Time) { - is.lock.Unlock() +// Unlock write-unlock for specific repo. +func (is *ImageStore) UnlockRepo(repo string, lockStart *time.Time) { + is.lock.UnlockRepo(repo) lockEnd := time.Now() // includes time spent in acquiring and holding a lock @@ -200,8 +199,8 @@ func (is *ImageStore) initRepo(name string) error { func (is *ImageStore) InitRepo(name string) error { var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(name, &lockLatency) + defer is.UnlockRepo(name, &lockLatency) return is.initRepo(name) } @@ -263,13 +262,12 @@ func (is *ImageStore) ValidateRepo(name string) (bool, error) { // GetRepositories returns a list of all the repositories under this store. func (is *ImageStore) GetRepositories() ([]string, error) { - var lockLatency time.Time - + // Ideally this function would lock while walking in order to avoid concurrency issues + // but we can't lock everything as we don't have a valid list of all repositories + // let's assume the result of this function is a best effort and some repos may be + // added or removed by the time it returns dir := is.rootDir - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) - stores := make([]string, 0) err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error { @@ -293,7 +291,9 @@ func (is *ImageStore) GetRepositories() ([]string, error) { return nil //nolint:nilerr // ignore invalid repos } - stores = append(stores, rel) + if !zcommon.Contains(stores, rel) { + stores = append(stores, rel) + } return nil }) @@ -309,13 +309,12 @@ func (is *ImageStore) GetRepositories() ([]string, error) { // GetNextRepository returns next repository under this store. func (is *ImageStore) GetNextRepository(repo string) (string, error) { - var lockLatency time.Time - + // Ideally this function would lock while walking in order to avoid concurrency issues + // but we can't lock everything as we don't have a valid list of all repositories + // let's assume the result of this function is a best effort and some repos may be + // added or removed by the time it returns dir := is.rootDir - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) - _, err := is.storeDriver.List(dir) if err != nil { if errors.As(err, &driver.PathNotFoundError{}) { @@ -392,8 +391,8 @@ func (is *ImageStore) GetImageTags(repo string) ([]string, error) { return nil, zerr.ErrRepoNotFound } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) index, err := common.GetIndex(is, repo, is.log) if err != nil { @@ -414,9 +413,10 @@ func (is *ImageStore) GetImageManifest(repo, reference string) ([]byte, godigest var err error - is.RLock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer func() { - is.RUnlock(&lockLatency) + is.RUnlockRepo(repo, &lockLatency) if err == nil { monitoring.IncDownloadCounter(is.metrics, repo) @@ -466,9 +466,10 @@ func (is *ImageStore) PutImageManifest(repo, reference, mediaType string, //noli var err error - is.Lock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer func() { - is.Unlock(&lockLatency) + is.UnlockRepo(repo, &lockLatency) if err == nil { if is.storeDriver.Name() == storageConstants.LocalStorageDriverName { @@ -596,8 +597,8 @@ func (is *ImageStore) DeleteImageManifest(repo, reference string, detectCollisio var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) err := is.deleteImageManifest(repo, reference, detectCollisions) if err != nil { @@ -921,8 +922,8 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { err = is.DedupeBlob(src, dstDigest, repo, dst) @@ -1001,8 +1002,8 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) dst := is.BlobPath(repo, dstDigest) @@ -1155,8 +1156,6 @@ func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string { } func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]string, error) { - var lockLatency time.Time - if err := digest.Validate(); err != nil { return nil, err } @@ -1165,9 +1164,6 @@ func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]str return nil, nil //nolint:nilnil } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) - blobsPaths, err := is.cache.GetAllBlobs(digest) if err != nil { return nil, err @@ -1204,11 +1200,11 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6 blobPath := is.BlobPath(repo, digest) if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) { - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) } else { - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) } binfo, err := is.storeDriver.Stat(blobPath) @@ -1340,8 +1336,8 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT return nil, -1, -1, err } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) binfo, err := is.originalBlobInfo(repo, digest) if err != nil { @@ -1417,8 +1413,8 @@ func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType str return nil, -1, err } - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) binfo, err := is.originalBlobInfo(repo, digest) if err != nil { @@ -1494,8 +1490,8 @@ func (is *ImageStore) GetReferrers(repo string, gdigest godigest.Digest, artifac ) (ispec.Index, error) { var lockLatency time.Time - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) return common.GetReferrers(is, repo, gdigest, artifactTypes, is.log) } @@ -1568,8 +1564,8 @@ func (is *ImageStore) DeleteBlob(repo string, digest godigest.Digest) error { return err } - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) return is.deleteBlob(repo, digest) } @@ -1764,16 +1760,18 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]godigest.Digest, error) { return ret, nil } -func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest, -) (godigest.Digest, []string, error) { +func (is *ImageStore) GetNextDigestWithBlobPaths(allRepos []string, lastDigests []godigest.Digest, +) (godigest.Digest, []string, []string, error) { var lockLatency time.Time dir := is.rootDir - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + for _, repo := range allRepos { + is.RLockRepo(repo, &lockLatency) + defer is.RUnlockRepo(repo, &lockLatency) + } - var duplicateBlobs []string + var duplicateBlobs, duplicateRepos []string var digest godigest.Digest @@ -1784,15 +1782,37 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g return driver.ErrSkipDir } + if strings.HasSuffix(fileInfo.Path(), ispec.ImageLayoutFile) || + strings.HasSuffix(fileInfo.Path(), ispec.ImageIndexFile) || + strings.HasSuffix(fileInfo.Path(), ".db") { + return nil + } + + // the path is always under root dir because the walk function walks the root dir + rel, _ := filepath.Rel(is.rootDir, fileInfo.Path()) + if fileInfo.IsDir() { - // skip repositories not found in repos - repo := path.Base(fileInfo.Path()) - if !zcommon.Contains(repos, repo) && repo != ispec.ImageBlobsDir { - candidateAlgorithm := godigest.Algorithm(repo) + if fileInfo.Path() == is.rootDir || zcommon.Contains(allRepos, rel) { + // this is the root directory or a repo, go deeped into subfolders + return nil + } - if !candidateAlgorithm.Available() { - return driver.ErrSkipDir - } + // attempt to determine is the base folder + lastFolderInPath := path.Base(rel) + if lastFolderInPath == ispec.ImageBlobsDir { + // this is the blobs dir, go deeper into subfolders + return nil + } + + // this is not the root dir, a repo, or a blobs dir + // it is also unclear if we are under a repo, as this could be .trivy + // skip entire directory if the base name does not match a valid hash algorithm + candidateAlgorithm := godigest.Algorithm(lastFolderInPath) + if !candidateAlgorithm.Available() { + return driver.ErrSkipDir + } else { + // this is the folder sha256 or similar + return nil } } @@ -1814,6 +1834,11 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g if blobDigest == digest { duplicateBlobs = append(duplicateBlobs, fileInfo.Path()) + + repo := path.Dir(path.Dir(path.Dir(rel))) + if !zcommon.Contains(duplicateRepos, repo) { + duplicateRepos = append(duplicateRepos, repo) + } } return nil @@ -1823,10 +1848,10 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g var perr driver.PathNotFoundError if errors.As(err, &perr) { - return digest, duplicateBlobs, nil + return digest, duplicateBlobs, duplicateRepos, nil } - return digest, duplicateBlobs, err + return digest, duplicateBlobs, duplicateRepos, err } func (is *ImageStore) getOriginalBlobFromDisk(duplicateBlobs []string) (string, error) { @@ -2001,12 +2026,14 @@ func (is *ImageStore) restoreDedupedBlobs(ctx context.Context, digest godigest.D } func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, - duplicateBlobs []string, + duplicateBlobs []string, duplicateRepos []string, ) error { var lockLatency time.Time - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) + for _, repo := range duplicateRepos { + is.LockRepo(repo, &lockLatency) + defer is.UnlockRepo(repo, &lockLatency) + } if dedupe { return is.dedupeBlobs(ctx, digest, duplicateBlobs) diff --git a/pkg/storage/imagestore/lock.go b/pkg/storage/imagestore/lock.go new file mode 100644 index 000000000..0ead2d58d --- /dev/null +++ b/pkg/storage/imagestore/lock.go @@ -0,0 +1,56 @@ +package imagestore + +import ( + "sync" +) + +type ImageStoreLock struct { + // locks per repository paths + repoLocks sync.Map +} + +func NewImageStoreLock() *ImageStoreLock { + return &ImageStoreLock{ + repoLocks: sync.Map{}, + } +} + +func (sl *ImageStoreLock) RLockRepo(repo string) { + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) + + // lock individual repo + repoLock, _ := val.(*sync.RWMutex) + repoLock.RLock() +} + +func (sl *ImageStoreLock) RUnlockRepo(repo string) { + val, ok := sl.repoLocks.Load(repo) + if !ok { + // somehow the unlock is called for repo that was not locked + return + } + + // read-unlock individual repo + repoLock, _ := val.(*sync.RWMutex) + repoLock.RUnlock() +} + +func (sl *ImageStoreLock) LockRepo(repo string) { + val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{}) + + // write-lock individual repo + repoLock, _ := val.(*sync.RWMutex) + repoLock.Lock() +} + +func (sl *ImageStoreLock) UnlockRepo(repo string) { + val, ok := sl.repoLocks.Load(repo) + if !ok { + // somehow the unlock is called for a repo that was not locked + return + } + + // write-unlock individual repo + repoLock, _ := val.(*sync.RWMutex) + repoLock.Unlock() +} diff --git a/pkg/storage/imagestore/lock_test.go b/pkg/storage/imagestore/lock_test.go new file mode 100644 index 000000000..939371a9a --- /dev/null +++ b/pkg/storage/imagestore/lock_test.go @@ -0,0 +1,71 @@ +package imagestore_test + +import ( + _ "crypto/sha256" + "os" + "strconv" + "sync" + "testing" + "time" + + "github.com/rs/zerolog" + . "github.com/smartystreets/goconvey/convey" + + "zotregistry.dev/zot/pkg/extensions/monitoring" + zlog "zotregistry.dev/zot/pkg/log" + "zotregistry.dev/zot/pkg/storage" + "zotregistry.dev/zot/pkg/storage/cache" + "zotregistry.dev/zot/pkg/storage/local" +) + +func TestStorageLocks(t *testing.T) { + dir := t.TempDir() + + log := zlog.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{ + RootDir: dir, + Name: "cache", + UseRelPaths: true, + }, log) + + imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver) + + Convey("Locks", t, func() { + // in parallel, a mix of read and write locks - mainly for coverage + var wg sync.WaitGroup + for i := 0; i < 1000; i++ { + repo := "repo" + strconv.Itoa(i%10) + + wg.Add(2) + + go func() { + var lockLatency time.Time + + defer wg.Done() + + t.Logf("Repo %s will be write-locked in loop %d", repo, i) + imgStore.LockRepo(repo, &lockLatency) + func() { + t.Logf("Execute while repo %s is write-locked in loop %d", repo, i) + }() + imgStore.UnlockRepo(repo, &lockLatency) + t.Logf("Repo %s is write-unlocked in loop %d", repo, i) + }() + go func() { + var lockLatency time.Time + + defer wg.Done() + t.Logf("Repo %s will be read-locked in loop %d", repo, i) + imgStore.RLockRepo(repo, &lockLatency) + func() { + t.Logf("Execute while repo %s is read-locked in loop %d", repo, i) + }() + imgStore.RUnlockRepo(repo, &lockLatency) + t.Logf("Repo %s is read-unlocked in loop %d", repo, i) + }() + } + + wg.Wait() + }) +} diff --git a/pkg/storage/local/driver.go b/pkg/storage/local/driver.go index 177e5d021..29c6246d1 100644 --- a/pkg/storage/local/driver.go +++ b/pkg/storage/local/driver.go @@ -189,7 +189,13 @@ func (driver *Driver) WriteFile(filepath string, content []byte) (int, error) { func (driver *Driver) Walk(path string, walkFn storagedriver.WalkFn) error { children, err := driver.List(path) if err != nil { - return err + switch errors.As(err, &storagedriver.PathNotFoundError{}) { + case true: + // repository was removed in between listing and enumeration. Ignore it. + return nil + default: + return err + } } sort.Stable(sort.StringSlice(children)) diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index ce603fef9..c974f9156 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -1280,11 +1280,16 @@ func TestDedupeLinks(t *testing.T) { path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest2), } + duplicateRepos := []string{ + path.Join(dir, "dedupe1"), + } + // remove original blob so that it can not be statted err := os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), godigest.Digest(blobDigest1), true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), godigest.Digest(blobDigest1), true, + duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 85d0fbeb2..f675ccf8e 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -2252,10 +2252,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2301,10 +2302,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2350,10 +2352,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2396,10 +2399,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) Convey("Trigger Stat() error in dedupeBlobs()", func() { @@ -2441,10 +2445,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, false, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) }) @@ -2492,10 +2497,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2540,10 +2546,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2588,10 +2595,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2603,7 +2611,7 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - _, _, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + _, _, _, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) So(err, ShouldNotBeNil) }) @@ -2695,10 +2703,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2717,10 +2726,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) @@ -2735,10 +2745,11 @@ func TestRebuildDedupeMockStoreDriver(t *testing.T) { }, }) - digest, duplicateBlobs, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, []godigest.Digest{}) + digest, duplicateBlobs, duplicateRepos, err := imgStore.GetNextDigestWithBlobPaths([]string{"path/to"}, + []godigest.Digest{}) So(err, ShouldBeNil) - err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs) + err = imgStore.RunDedupeForDigest(context.TODO(), digest, true, duplicateBlobs, duplicateRepos) So(err, ShouldNotBeNil) }) }) diff --git a/pkg/storage/scrub.go b/pkg/storage/scrub.go index 859252cbe..af633a5af 100644 --- a/pkg/storage/scrub.go +++ b/pkg/storage/scrub.go @@ -134,8 +134,8 @@ func checkImage( ) ([]ispec.Descriptor, error) { var lockLatency time.Time - imgStore.RLock(&lockLatency) - defer imgStore.RUnlock(&lockLatency) + imgStore.RLockRepo(imageName, &lockLatency) + defer imgStore.RUnlockRepo(imageName, &lockLatency) manifestContent, err := imgStore.GetBlobContent(imageName, manifest.Digest) if err != nil { @@ -149,8 +149,8 @@ func checkImage( func getIndex(imageName string, imgStore storageTypes.ImageStore) ([]byte, error) { var lockLatency time.Time - imgStore.RLock(&lockLatency) - defer imgStore.RUnlock(&lockLatency) + imgStore.RLockRepo(imageName, &lockLatency) + defer imgStore.RUnlockRepo(imageName, &lockLatency) // check image structure / layout ok, err := imgStore.ValidateRepo(imageName) diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index a94591339..716e12a80 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -12,7 +12,6 @@ import ( "path" "slices" "strings" - "sync" "testing" "time" @@ -889,33 +888,6 @@ func TestStorageAPIs(t *testing.T) { _, _, err = imgStore.PutImageManifest("replace", "1.0", ispec.MediaTypeImageManifest, manifestBuf) So(err, ShouldBeNil) }) - - Convey("Locks", func() { - // in parallel, a mix of read and write locks - mainly for coverage - var wg sync.WaitGroup - for i := 0; i < 1000; i++ { - wg.Add(2) - - go func() { - var lockLatency time.Time - - defer wg.Done() - imgStore.Lock(&lockLatency) - func() {}() - imgStore.Unlock(&lockLatency) - }() - go func() { - var lockLatency time.Time - - defer wg.Done() - imgStore.RLock(&lockLatency) - func() {}() - imgStore.RUnlock(&lockLatency) - }() - } - - wg.Wait() - }) }) }) } diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index 9d5cd4882..d9ff72259 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -22,10 +22,10 @@ type ImageStore interface { //nolint:interfacebloat Name() string DirExists(d string) bool RootDir() string - RLock(*time.Time) - RUnlock(*time.Time) - Lock(*time.Time) - Unlock(*time.Time) + RLockRepo(repo string, lockStart *time.Time) + RUnlockRepo(repo string, lockStart *time.Time) + LockRepo(repo string, lockStart *time.Time) + UnlockRepo(repo string, lockStart *time.Time) InitRepo(name string) error ValidateRepo(name string) (bool, error) GetRepositories() ([]string, error) @@ -60,8 +60,9 @@ type ImageStore interface { //nolint:interfacebloat GetBlobContent(repo string, digest godigest.Digest) ([]byte, error) GetReferrers(repo string, digest godigest.Digest, artifactTypes []string) (ispec.Index, error) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) - RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, duplicateBlobs []string) error - GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error) + RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, + duplicateBlobs, duplicateRepos []string) error + GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, []string, error) GetAllBlobs(repo string) ([]godigest.Digest, error) PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler) VerifyBlobDigestValue(repo string, digest godigest.Digest) error diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index d94b540d8..b98a299ba 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -51,8 +51,9 @@ type MockedImageStore struct { RunGCPeriodicallyFn func(interval time.Duration, sch *scheduler.Scheduler) RunDedupeBlobsFn func(interval time.Duration, sch *scheduler.Scheduler) RunDedupeForDigestFn func(ctx context.Context, digest godigest.Digest, dedupe bool, - duplicateBlobs []string) error - GetNextDigestWithBlobPathsFn func(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error) + duplicateBlobs []string, duplicateRepos []string) error + GetNextDigestWithBlobPathsFn func(repos []string, lastDigests []godigest.Digest, + ) (godigest.Digest, []string, []string, error) GetAllBlobsFn func(repo string) ([]godigest.Digest, error) CleanupRepoFn func(repo string, blobs []godigest.Digest, removeRepo bool) (int, error) PutIndexContentFn func(repo string, index ispec.Index) error @@ -70,16 +71,16 @@ func (is MockedImageStore) StatIndex(repo string) (bool, int64, time.Time, error return true, 0, time.Time{}, nil } -func (is MockedImageStore) Lock(t *time.Time) { +func (is MockedImageStore) LockRepo(repo string, t *time.Time) { } -func (is MockedImageStore) Unlock(t *time.Time) { +func (is MockedImageStore) UnlockRepo(repo string, t *time.Time) { } -func (is MockedImageStore) RUnlock(t *time.Time) { +func (is MockedImageStore) RUnlockRepo(repo string, t *time.Time) { } -func (is MockedImageStore) RLock(t *time.Time) { +func (is MockedImageStore) RLockRepo(repo string, t *time.Time) { } func (is MockedImageStore) Name() string { @@ -391,22 +392,22 @@ func (is MockedImageStore) RunDedupeBlobs(interval time.Duration, sch *scheduler } func (is MockedImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, - duplicateBlobs []string, + duplicateBlobs []string, duplicateRepos []string, ) error { if is.RunDedupeForDigestFn != nil { - return is.RunDedupeForDigestFn(ctx, digest, dedupe, duplicateBlobs) + return is.RunDedupeForDigestFn(ctx, digest, dedupe, duplicateBlobs, duplicateRepos) } return nil } func (is MockedImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest, -) (godigest.Digest, []string, error) { +) (godigest.Digest, []string, []string, error) { if is.GetNextDigestWithBlobPathsFn != nil { return is.GetNextDigestWithBlobPathsFn(repos, lastDigests) } - return "", []string{}, nil + return "", []string{}, []string{}, nil } func (is MockedImageStore) CleanupRepo(repo string, blobs []godigest.Digest, removeRepo bool) (int, error) { diff --git a/pkg/test/oci-utils/oci_layout.go b/pkg/test/oci-utils/oci_layout.go index f2a1cc54a..9aa81fdff 100644 --- a/pkg/test/oci-utils/oci_layout.go +++ b/pkg/test/oci-utils/oci_layout.go @@ -102,8 +102,8 @@ func (olu BaseOciLayoutUtils) GetImageManifests(repo string) ([]ispec.Descriptor imageStore := olu.StoreController.GetImageStore(repo) - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) buf, err := imageStore.GetIndexContent(repo) if err != nil { @@ -137,8 +137,8 @@ func (olu BaseOciLayoutUtils) GetImageBlobManifest(repo string, digest godigest. imageStore := olu.StoreController.GetImageStore(repo) - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) blobBuf, err := imageStore.GetBlobContent(repo, digest) if err != nil { @@ -163,8 +163,8 @@ func (olu BaseOciLayoutUtils) GetImageInfo(repo string, configDigest godigest.Di imageStore := olu.StoreController.GetImageStore(repo) - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) blobBuf, err := imageStore.GetBlobContent(repo, configDigest) if err != nil { @@ -323,8 +323,8 @@ func (olu BaseOciLayoutUtils) GetImageManifestSize(repo string, manifestDigest g var lockLatency time.Time - imageStore.RLock(&lockLatency) - defer imageStore.RUnlock(&lockLatency) + imageStore.RLockRepo(repo, &lockLatency) + defer imageStore.RUnlockRepo(repo, &lockLatency) manifestBlob, err := imageStore.GetBlobContent(repo, manifestDigest) if err != nil {