Skip to content

Commit f23ccb3

Browse files
committed
refactor: remove global locking feature from the imagestore
Signed-off-by: Andrei Aaron <[email protected]>
1 parent 501c79b commit f23ccb3

File tree

7 files changed

+114
-170
lines changed

7 files changed

+114
-170
lines changed

pkg/storage/imagestore/imagestore.go

Lines changed: 47 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -89,40 +89,6 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo
8989
return imgStore
9090
}
9191

92-
// RLock read-lock.
93-
func (is *ImageStore) RLock(lockStart *time.Time) {
94-
*lockStart = time.Now()
95-
96-
is.lock.RLock()
97-
}
98-
99-
// RUnlock read-unlock.
100-
func (is *ImageStore) RUnlock(lockStart *time.Time) {
101-
is.lock.RUnlock()
102-
103-
lockEnd := time.Now()
104-
// includes time spent in acquiring and holding a lock
105-
latency := lockEnd.Sub(*lockStart)
106-
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram
107-
}
108-
109-
// Lock write-lock.
110-
func (is *ImageStore) Lock(lockStart *time.Time) {
111-
*lockStart = time.Now()
112-
113-
is.lock.Lock()
114-
}
115-
116-
// Unlock write-unlock.
117-
func (is *ImageStore) Unlock(lockStart *time.Time) {
118-
is.lock.Unlock()
119-
120-
lockEnd := time.Now()
121-
// includes time spent in acquiring and holding a lock
122-
latency := lockEnd.Sub(*lockStart)
123-
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram
124-
}
125-
12692
// RLock read-lock for specific repo.
12793
func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) {
12894
*lockStart = time.Now()
@@ -296,13 +262,12 @@ func (is *ImageStore) ValidateRepo(name string) (bool, error) {
296262

297263
// GetRepositories returns a list of all the repositories under this store.
298264
func (is *ImageStore) GetRepositories() ([]string, error) {
299-
var lockLatency time.Time
300-
265+
// Ideally this function would lock while walking in order to avoid concurrency issues
266+
// but we can't lock everything as we don't have a valid list of all repositories
267+
// let's assume the result of this function is a best effort and some repos may be
268+
// added or removed by the time it returns
301269
dir := is.rootDir
302270

303-
is.RLock(&lockLatency)
304-
defer is.RUnlock(&lockLatency)
305-
306271
stores := make([]string, 0)
307272

308273
err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
@@ -342,13 +307,12 @@ func (is *ImageStore) GetRepositories() ([]string, error) {
342307

343308
// GetNextRepository returns next repository under this store.
344309
func (is *ImageStore) GetNextRepository(repo string) (string, error) {
345-
var lockLatency time.Time
346-
310+
// Ideally this function would lock while walking in order to avoid concurrency issues
311+
// but we can't lock everything as we don't have a valid list of all repositories
312+
// let's assume the result of this function is a best effort and some repos may be
313+
// added or removed by the time it returns
347314
dir := is.rootDir
348315

349-
is.RLock(&lockLatency)
350-
defer is.RUnlock(&lockLatency)
351-
352316
_, err := is.storeDriver.List(dir)
353317
if err != nil {
354318
if errors.As(err, &driver.PathNotFoundError{}) {
@@ -1190,8 +1154,6 @@ func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string {
11901154
}
11911155

11921156
func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]string, error) {
1193-
var lockLatency time.Time
1194-
11951157
if err := digest.Validate(); err != nil {
11961158
return nil, err
11971159
}
@@ -1200,9 +1162,6 @@ func (is *ImageStore) GetAllDedupeReposCandidates(digest godigest.Digest) ([]str
12001162
return nil, nil //nolint:nilnil
12011163
}
12021164

1203-
is.RLock(&lockLatency)
1204-
defer is.RUnlock(&lockLatency)
1205-
12061165
blobsPaths, err := is.cache.GetAllBlobs(digest)
12071166
if err != nil {
12081167
return nil, err
@@ -1799,41 +1758,62 @@ func (is *ImageStore) GetAllBlobs(repo string) ([]godigest.Digest, error) {
17991758
return ret, nil
18001759
}
18011760

1802-
func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest,
1761+
func (is *ImageStore) GetNextDigestWithBlobPaths(allRepos []string, lastDigests []godigest.Digest,
18031762
) (godigest.Digest, []string, []string, error) {
18041763
var lockLatency time.Time
18051764

18061765
dir := is.rootDir
18071766

1808-
is.RLock(&lockLatency)
1809-
defer is.RUnlock(&lockLatency)
1767+
for _, repo := range allRepos {
1768+
is.RLockRepo(repo, &lockLatency)
1769+
defer is.RUnlockRepo(repo, &lockLatency)
1770+
}
18101771

18111772
var duplicateBlobs, duplicateRepos []string
18121773

18131774
var digest godigest.Digest
18141775

1815-
var repo string
1816-
18171776
err := is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
18181777
// skip blobs under .sync and .uploads
18191778
if strings.HasSuffix(fileInfo.Path(), syncConstants.SyncBlobUploadDir) ||
18201779
strings.HasSuffix(fileInfo.Path(), storageConstants.BlobUploadDir) {
18211780
return driver.ErrSkipDir
18221781
}
18231782

1783+
if strings.HasSuffix(fileInfo.Path(), ispec.ImageLayoutFile) ||
1784+
strings.HasSuffix(fileInfo.Path(), ispec.ImageIndexFile) ||
1785+
strings.HasSuffix(fileInfo.Path(), ".db") {
1786+
return nil
1787+
}
1788+
1789+
// the path is always under root dir because the walk function walks the root dir
1790+
rel, _ := filepath.Rel(is.rootDir, fileInfo.Path())
1791+
18241792
if fileInfo.IsDir() {
1825-
// skip repositories not found in repos
1826-
repo = path.Base(fileInfo.Path())
1827-
if !zcommon.Contains(repos, repo) && repo != ispec.ImageBlobsDir {
1828-
candidateAlgorithm := godigest.Algorithm(repo)
1793+
if fileInfo.Path() == is.rootDir || zcommon.Contains(allRepos, rel) {
1794+
// this is the root directory or a repo, go deeped into subfolders
1795+
return nil
1796+
}
18291797

1830-
if !candidateAlgorithm.Available() {
1831-
return driver.ErrSkipDir
1832-
}
1798+
// attempt to determine is the base folder
1799+
lastFolderInPath := path.Base(rel)
1800+
if lastFolderInPath == ispec.ImageBlobsDir {
1801+
// this is the blobs dir, go deeper into subfolders
1802+
return nil
1803+
}
1804+
1805+
// this is not the root dir, a repo, or a blobs dir
1806+
// it is also unclear if we are under a repo, as this could be .trivy
1807+
// skip entire directory if the base name does not match a valid hash algorithm
1808+
candidateAlgorithm := godigest.Algorithm(lastFolderInPath)
1809+
if !candidateAlgorithm.Available() {
1810+
return driver.ErrSkipDir
1811+
} else {
1812+
// this is the folder sha256 or similar
1813+
return nil
18331814
}
18341815
}
18351816

1836-
repo = path.Dir(path.Dir(fileInfo.Path()))
18371817
digestHash := path.Base(fileInfo.Path())
18381818
digestAlgorithm := godigest.Algorithm(path.Base(path.Dir(fileInfo.Path())))
18391819

@@ -1853,6 +1833,7 @@ func (is *ImageStore) GetNextDigestWithBlobPaths(repos []string, lastDigests []g
18531833
if blobDigest == digest {
18541834
duplicateBlobs = append(duplicateBlobs, fileInfo.Path())
18551835

1836+
repo := path.Dir(path.Dir(path.Dir(rel)))
18561837
if !zcommon.Contains(duplicateRepos, repo) {
18571838
duplicateRepos = append(duplicateRepos, repo)
18581839
}
@@ -2047,8 +2028,10 @@ func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Di
20472028
) error {
20482029
var lockLatency time.Time
20492030

2050-
is.Lock(&lockLatency)
2051-
defer is.Unlock(&lockLatency)
2031+
for _, repo := range duplicateRepos {
2032+
is.LockRepo(repo, &lockLatency)
2033+
defer is.UnlockRepo(repo, &lockLatency)
2034+
}
20522035

20532036
if dedupe {
20542037
return is.dedupeBlobs(ctx, digest, duplicateBlobs)

pkg/storage/imagestore/lock.go

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,93 +6,92 @@ import (
66

77
type ImageStoreLock struct {
88
// locks per repository paths
9-
repoLocks sync.Map
10-
// lock for the entire storage, needed in case all repos need to be processed
11-
// including blocking creating new repos
12-
globalLock *sync.RWMutex
9+
repoLocks map[string]*sync.RWMutex
10+
// lock for managing the content of the repo lock map
11+
internalLock *sync.Mutex
1312
}
1413

1514
func NewImageStoreLock() *ImageStoreLock {
1615
return &ImageStoreLock{
17-
repoLocks: sync.Map{},
18-
globalLock: &sync.RWMutex{},
16+
repoLocks: map[string]*sync.RWMutex{},
17+
internalLock: &sync.Mutex{},
1918
}
2019
}
2120

22-
func (sl *ImageStoreLock) RLock() {
23-
// block reads and writes to the entire storage, including new repos
24-
sl.globalLock.RLock()
25-
}
26-
27-
func (sl *ImageStoreLock) RUnlock() {
28-
// unlock to the storage in general
29-
sl.globalLock.RUnlock()
30-
}
31-
32-
func (sl *ImageStoreLock) Lock() {
33-
// block reads and writes to the entire storage, including new repos
34-
sl.globalLock.Lock()
35-
}
36-
37-
func (sl *ImageStoreLock) Unlock() {
38-
// unlock to the storage in general
39-
sl.globalLock.Unlock()
40-
}
41-
4221
func (sl *ImageStoreLock) RLockRepo(repo string) {
43-
// besides the individual repo increment the read counter for the
44-
// global lock, this will make sure the storage cannot be
45-
// write-locked at global level while individual repos are accessed
46-
sl.globalLock.RLock()
47-
48-
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
22+
repoLock, _ := sl.loadLock(repo)
4923

5024
// lock individual repo
51-
repoLock, _ := val.(*sync.RWMutex)
5225
repoLock.RLock()
5326
}
5427

5528
func (sl *ImageStoreLock) RUnlockRepo(repo string) {
56-
val, ok := sl.repoLocks.Load(repo)
29+
repoLock, ok := sl.loadLock(repo)
5730
if !ok {
58-
// somehow the unlock is called for repo that was never locked
31+
// somehow the unlock is called for a repo that was not locked
5932
return
6033
}
6134

6235
// read-unlock individual repo
63-
repoLock, _ := val.(*sync.RWMutex)
6436
repoLock.RUnlock()
65-
66-
// decrement the global read counter after the one for the individual repo is decremented
67-
sl.globalLock.RUnlock()
6837
}
6938

7039
func (sl *ImageStoreLock) LockRepo(repo string) {
71-
// besides the individual repo increment the read counter for the
72-
// global lock, this will make sure the storage cannot be
73-
// write-locked at global level while individual repos are accessed
74-
// we are not using the write lock here, as that would make all repos
75-
// wait for one another
76-
sl.globalLock.RLock()
77-
78-
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
40+
repoLock, _ := sl.loadLock(repo)
7941

8042
// write-lock individual repo
81-
repoLock, _ := val.(*sync.RWMutex)
8243
repoLock.Lock()
8344
}
8445

8546
func (sl *ImageStoreLock) UnlockRepo(repo string) {
86-
val, ok := sl.repoLocks.Load(repo)
47+
repoLock, ok := sl.loadLock(repo)
8748
if !ok {
88-
// somehow the unlock is called for a repo that was never locked
49+
// somehow the unlock is called for a repo that was not locked
8950
return
9051
}
9152

9253
// write-unlock individual repo
93-
repoLock, _ := val.(*sync.RWMutex)
9454
repoLock.Unlock()
9555

96-
// decrement the global read counter after the individual repo was unlocked
97-
sl.globalLock.RUnlock()
56+
// attempt to clean up the map of unused locks
57+
sl.discardLockIfPossible(repo)
58+
}
59+
60+
func (sl *ImageStoreLock) loadLock(repo string) (*sync.RWMutex, bool) {
61+
sl.internalLock.Lock()
62+
defer sl.internalLock.Unlock()
63+
64+
repoLock, ok := sl.repoLocks[repo]
65+
if !ok {
66+
sl.repoLocks[repo] = &sync.RWMutex{}
67+
repoLock = sl.repoLocks[repo]
68+
}
69+
70+
return repoLock, ok
71+
}
72+
73+
func (sl *ImageStoreLock) discardLockIfPossible(repo string) {
74+
sl.internalLock.Lock()
75+
defer sl.internalLock.Unlock()
76+
77+
repoLock, ok := sl.repoLocks[repo]
78+
if !ok {
79+
// the lock is not set, no need to do anything else
80+
return
81+
}
82+
83+
// check if the lock is in use
84+
// this is a non-blocking operation if someone else is already blocking the lock
85+
// the internalLock prevents the case where someone else attempts
86+
// to load/block the lock after this function started executing
87+
ok = repoLock.TryLock()
88+
if !ok {
89+
// if someone else is using this lock, it is still needed, keep it as is
90+
return
91+
}
92+
// final unlock
93+
defer repoLock.Unlock()
94+
95+
// nobody else is using this lock, remove it from the map
96+
delete(sl.repoLocks, repo)
9897
}

pkg/storage/local/driver.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,13 @@ func (driver *Driver) WriteFile(filepath string, content []byte) (int, error) {
189189
func (driver *Driver) Walk(path string, walkFn storagedriver.WalkFn) error {
190190
children, err := driver.List(path)
191191
if err != nil {
192-
return err
192+
switch errors.As(err, &storagedriver.PathNotFoundError{}) {
193+
case true:
194+
// repository was removed in between listing and enumeration. Ignore it.
195+
return nil
196+
default:
197+
return err
198+
}
193199
}
194200

195201
sort.Stable(sort.StringSlice(children))

pkg/storage/storage_test.go

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"path"
1313
"slices"
1414
"strings"
15-
"sync"
1615
"testing"
1716
"time"
1817

@@ -889,33 +888,6 @@ func TestStorageAPIs(t *testing.T) {
889888
_, _, err = imgStore.PutImageManifest("replace", "1.0", ispec.MediaTypeImageManifest, manifestBuf)
890889
So(err, ShouldBeNil)
891890
})
892-
893-
Convey("Locks", func() {
894-
// in parallel, a mix of read and write locks - mainly for coverage
895-
var wg sync.WaitGroup
896-
for i := 0; i < 1000; i++ {
897-
wg.Add(2)
898-
899-
go func() {
900-
var lockLatency time.Time
901-
902-
defer wg.Done()
903-
imgStore.Lock(&lockLatency)
904-
func() {}()
905-
imgStore.Unlock(&lockLatency)
906-
}()
907-
go func() {
908-
var lockLatency time.Time
909-
910-
defer wg.Done()
911-
imgStore.RLock(&lockLatency)
912-
func() {}()
913-
imgStore.RUnlock(&lockLatency)
914-
}()
915-
}
916-
917-
wg.Wait()
918-
})
919891
})
920892
})
921893
}

0 commit comments

Comments
 (0)