Skip to content

Commit 8aa74db

Browse files
committed
feat(storage): enable parallel writes by using per-repo locking
Signed-off-by: Andrei Aaron <[email protected]>
1 parent 2dea22f commit 8aa74db

File tree

9 files changed

+192
-45
lines changed

9 files changed

+192
-45
lines changed

pkg/api/routes.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -957,8 +957,8 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
957957
} else {
958958
var lockLatency time.Time
959959

960-
imgStore.RLock(&lockLatency)
961-
defer imgStore.RUnlock(&lockLatency)
960+
imgStore.RLockRepo(name, &lockLatency)
961+
defer imgStore.RUnlockRepo(name, &lockLatency)
962962

963963
ok, blen, _, err = imgStore.StatBlob(name, digest)
964964
}

pkg/extensions/sync/destination.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,9 +136,9 @@ func (registry *DestinationRegistry) CommitImage(imageReference types.ImageRefer
136136
}
137137

138138
for _, manifest := range indexManifest.Manifests {
139-
tempImageStore.RLock(&lockLatency)
139+
tempImageStore.RLockRepo(repo, &lockLatency)
140140
manifestBuf, err := tempImageStore.GetBlobContent(repo, manifest.Digest)
141-
tempImageStore.RUnlock(&lockLatency)
141+
tempImageStore.RUnlockRepo(repo, &lockLatency)
142142

143143
if err != nil {
144144
registry.log.Error().Str("errorType", common.TypeOf(err)).

pkg/meta/parse.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreCo
109109

110110
var lockLatency time.Time
111111

112-
imageStore.RLock(&lockLatency)
113-
defer imageStore.RUnlock(&lockLatency)
112+
imageStore.RLockRepo(repo, &lockLatency)
113+
defer imageStore.RUnlockRepo(repo, &lockLatency)
114114

115115
indexBlob, err := imageStore.GetIndexContent(repo)
116116
if err != nil {
@@ -223,8 +223,8 @@ func getCosignSignatureLayersInfo(
223223

224224
var lockLatency time.Time
225225

226-
imageStore.RLock(&lockLatency)
227-
defer imageStore.RUnlock(&lockLatency)
226+
imageStore.RLockRepo(repo, &lockLatency)
227+
defer imageStore.RUnlockRepo(repo, &lockLatency)
228228

229229
for _, layer := range manifestContent.Layers {
230230
layerContent, err := imageStore.GetBlobContent(repo, layer.Digest)
@@ -280,8 +280,8 @@ func getNotationSignatureLayersInfo(
280280

281281
var lockLatency time.Time
282282

283-
imageStore.RLock(&lockLatency)
284-
defer imageStore.RUnlock(&lockLatency)
283+
imageStore.RLockRepo(repo, &lockLatency)
284+
defer imageStore.RUnlockRepo(repo, &lockLatency)
285285

286286
layerContent, err := imageStore.GetBlobContent(repo, layer)
287287
if err != nil {

pkg/storage/gc/gc.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ func (gc GarbageCollect) cleanRepo(ctx context.Context, repo string) error {
106106
return zerr.ErrRepoNotFound
107107
}
108108

109-
gc.imgStore.Lock(&lockLatency)
110-
defer gc.imgStore.Unlock(&lockLatency)
109+
gc.imgStore.LockRepo(repo, &lockLatency)
110+
defer gc.imgStore.UnlockRepo(repo, &lockLatency)
111111

112112
/* this index (which represents the index.json of this repo) is the root point from which we
113113
search for dangling manifests/blobs

pkg/storage/imagestore/imagestore.go

Lines changed: 62 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"path"
1111
"path/filepath"
1212
"strings"
13-
"sync"
1413
"time"
1514
"unicode/utf8"
1615

@@ -42,7 +41,7 @@ const (
4241
type ImageStore struct {
4342
rootDir string
4443
storeDriver storageTypes.Driver
45-
lock *sync.RWMutex
44+
lock *ImageStoreLock
4645
log zlog.Logger
4746
metrics monitoring.MetricServer
4847
cache cache.Cache
@@ -78,7 +77,7 @@ func NewImageStore(rootDir string, cacheDir string, dedupe, commit bool, log zlo
7877
imgStore := &ImageStore{
7978
rootDir: rootDir,
8079
storeDriver: storeDriver,
81-
lock: &sync.RWMutex{},
80+
lock: NewImageStoreLock(),
8281
log: log,
8382
metrics: metrics,
8483
dedupe: dedupe,
@@ -124,6 +123,40 @@ func (is *ImageStore) Unlock(lockStart *time.Time) {
124123
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram
125124
}
126125

126+
// RLock read-lock for specific repo
127+
func (is *ImageStore) RLockRepo(repo string, lockStart *time.Time) {
128+
*lockStart = time.Now()
129+
130+
is.lock.RLockRepo(repo)
131+
}
132+
133+
// RUnlock read-unlock for specific repo.
134+
func (is *ImageStore) RUnlockRepo(repo string, lockStart *time.Time) {
135+
is.lock.RUnlockRepo(repo)
136+
137+
lockEnd := time.Now()
138+
// includes time spent in acquiring and holding a lock
139+
latency := lockEnd.Sub(*lockStart)
140+
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RLOCK) // histogram
141+
}
142+
143+
// Lock write-lock for specific repo..
144+
func (is *ImageStore) LockRepo(repo string, lockStart *time.Time) {
145+
*lockStart = time.Now()
146+
147+
is.lock.LockRepo(repo)
148+
}
149+
150+
// Unlock write-unlock for specific repo..
151+
func (is *ImageStore) UnlockRepo(repo string, lockStart *time.Time) {
152+
is.lock.UnlockRepo(repo)
153+
154+
lockEnd := time.Now()
155+
// includes time spent in acquiring and holding a lock
156+
latency := lockEnd.Sub(*lockStart)
157+
monitoring.ObserveStorageLockLatency(is.metrics, latency, is.RootDir(), storageConstants.RWLOCK) // histogram
158+
}
159+
127160
func (is *ImageStore) initRepo(name string) error {
128161
repoDir := path.Join(is.rootDir, name)
129162

@@ -200,8 +233,8 @@ func (is *ImageStore) initRepo(name string) error {
200233
func (is *ImageStore) InitRepo(name string) error {
201234
var lockLatency time.Time
202235

203-
is.Lock(&lockLatency)
204-
defer is.Unlock(&lockLatency)
236+
is.LockRepo(name, &lockLatency)
237+
defer is.UnlockRepo(name, &lockLatency)
205238

206239
return is.initRepo(name)
207240
}
@@ -392,8 +425,8 @@ func (is *ImageStore) GetImageTags(repo string) ([]string, error) {
392425
return nil, zerr.ErrRepoNotFound
393426
}
394427

395-
is.RLock(&lockLatency)
396-
defer is.RUnlock(&lockLatency)
428+
is.RLockRepo(repo, &lockLatency)
429+
defer is.RUnlockRepo(repo, &lockLatency)
397430

398431
index, err := common.GetIndex(is, repo, is.log)
399432
if err != nil {
@@ -414,9 +447,9 @@ func (is *ImageStore) GetImageManifest(repo, reference string) ([]byte, godigest
414447

415448
var err error
416449

417-
is.RLock(&lockLatency)
450+
is.RLockRepo(repo, &lockLatency)
418451
defer func() {
419-
is.RUnlock(&lockLatency)
452+
is.RUnlockRepo(repo, &lockLatency)
420453

421454
if err == nil {
422455
monitoring.IncDownloadCounter(is.metrics, repo)
@@ -466,9 +499,9 @@ func (is *ImageStore) PutImageManifest(repo, reference, mediaType string, //noli
466499

467500
var err error
468501

469-
is.Lock(&lockLatency)
502+
is.LockRepo(repo, &lockLatency)
470503
defer func() {
471-
is.Unlock(&lockLatency)
504+
is.UnlockRepo(repo, &lockLatency)
472505

473506
if err == nil {
474507
if is.storeDriver.Name() == storageConstants.LocalStorageDriverName {
@@ -596,8 +629,8 @@ func (is *ImageStore) DeleteImageManifest(repo, reference string, detectCollisio
596629

597630
var lockLatency time.Time
598631

599-
is.Lock(&lockLatency)
600-
defer is.Unlock(&lockLatency)
632+
is.LockRepo(repo, &lockLatency)
633+
defer is.UnlockRepo(repo, &lockLatency)
601634

602635
err := is.deleteImageManifest(repo, reference, detectCollisions)
603636
if err != nil {
@@ -885,8 +918,8 @@ func (is *ImageStore) FinishBlobUpload(repo, uuid string, body io.Reader, dstDig
885918

886919
var lockLatency time.Time
887920

888-
is.Lock(&lockLatency)
889-
defer is.Unlock(&lockLatency)
921+
is.LockRepo(repo, &lockLatency)
922+
defer is.UnlockRepo(repo, &lockLatency)
890923

891924
if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
892925
err = is.DedupeBlob(src, dstDigest, repo, dst)
@@ -965,8 +998,8 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, dstDigest godi
965998

966999
var lockLatency time.Time
9671000

968-
is.Lock(&lockLatency)
969-
defer is.Unlock(&lockLatency)
1001+
is.LockRepo(repo, &lockLatency)
1002+
defer is.UnlockRepo(repo, &lockLatency)
9701003

9711004
dst := is.BlobPath(repo, dstDigest)
9721005

@@ -1168,11 +1201,11 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6
11681201
blobPath := is.BlobPath(repo, digest)
11691202

11701203
if is.dedupe && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
1171-
is.Lock(&lockLatency)
1172-
defer is.Unlock(&lockLatency)
1204+
is.LockRepo(repo, &lockLatency)
1205+
defer is.UnlockRepo(repo, &lockLatency)
11731206
} else {
1174-
is.RLock(&lockLatency)
1175-
defer is.RUnlock(&lockLatency)
1207+
is.RLockRepo(repo, &lockLatency)
1208+
defer is.RUnlockRepo(repo, &lockLatency)
11761209
}
11771210

11781211
binfo, err := is.storeDriver.Stat(blobPath)
@@ -1304,8 +1337,8 @@ func (is *ImageStore) GetBlobPartial(repo string, digest godigest.Digest, mediaT
13041337
return nil, -1, -1, err
13051338
}
13061339

1307-
is.RLock(&lockLatency)
1308-
defer is.RUnlock(&lockLatency)
1340+
is.RLockRepo(repo, &lockLatency)
1341+
defer is.RUnlockRepo(repo, &lockLatency)
13091342

13101343
binfo, err := is.originalBlobInfo(repo, digest)
13111344
if err != nil {
@@ -1381,8 +1414,8 @@ func (is *ImageStore) GetBlob(repo string, digest godigest.Digest, mediaType str
13811414
return nil, -1, err
13821415
}
13831416

1384-
is.RLock(&lockLatency)
1385-
defer is.RUnlock(&lockLatency)
1417+
is.LockRepo(repo, &lockLatency)
1418+
defer is.UnlockRepo(repo, &lockLatency)
13861419

13871420
binfo, err := is.originalBlobInfo(repo, digest)
13881421
if err != nil {
@@ -1458,8 +1491,8 @@ func (is *ImageStore) GetReferrers(repo string, gdigest godigest.Digest, artifac
14581491
) (ispec.Index, error) {
14591492
var lockLatency time.Time
14601493

1461-
is.RLock(&lockLatency)
1462-
defer is.RUnlock(&lockLatency)
1494+
is.RLockRepo(repo, &lockLatency)
1495+
defer is.RUnlockRepo(repo, &lockLatency)
14631496

14641497
return common.GetReferrers(is, repo, gdigest, artifactTypes, is.log)
14651498
}
@@ -1532,8 +1565,8 @@ func (is *ImageStore) DeleteBlob(repo string, digest godigest.Digest) error {
15321565
return err
15331566
}
15341567

1535-
is.Lock(&lockLatency)
1536-
defer is.Unlock(&lockLatency)
1568+
is.LockRepo(repo, &lockLatency)
1569+
defer is.UnlockRepo(repo, &lockLatency)
15371570

15381571
return is.deleteBlob(repo, digest)
15391572
}

pkg/storage/imagestore/lock.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package imagestore
2+
3+
import (
4+
"sync"
5+
)
6+
7+
type ImageStoreLock struct {
8+
// locks per repository paths
9+
repoLocks sync.Map
10+
// lock for the entire storage, needed in case all repos need to be processed
11+
// including blocking creating new repos
12+
globalLock *sync.RWMutex
13+
}
14+
15+
func NewImageStoreLock() *ImageStoreLock {
16+
return &ImageStoreLock{
17+
repoLocks: sync.Map{},
18+
globalLock: &sync.RWMutex{},
19+
}
20+
}
21+
22+
func (sl *ImageStoreLock) RLock() {
23+
// block reads and writes to the entire storage, including new repos
24+
sl.globalLock.RLock()
25+
}
26+
27+
func (sl *ImageStoreLock) RUnlock() {
28+
// unlock to the storage in general
29+
sl.globalLock.RUnlock()
30+
}
31+
32+
func (sl *ImageStoreLock) Lock() {
33+
// block reads and writes to the entire storage, including new repos
34+
sl.globalLock.Lock()
35+
}
36+
37+
func (sl *ImageStoreLock) Unlock() {
38+
// unlock to the storage in general
39+
sl.globalLock.Unlock()
40+
}
41+
42+
func (sl *ImageStoreLock) RLockRepo(repo string) {
43+
// besides the individual repo increment the read counter for the
44+
// global lock, this will make sure the storage cannot be
45+
// write-locked at global level while individual repos are accessed
46+
sl.globalLock.RLock()
47+
48+
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
49+
50+
// lock individual repo
51+
repoLock := val.(*sync.RWMutex)
52+
repoLock.RLock()
53+
}
54+
55+
func (sl *ImageStoreLock) RUnlockRepo(repo string) {
56+
val, ok := sl.repoLocks.Load(repo)
57+
if !ok {
58+
// somehow the unlock is called for repo that was never locked
59+
return
60+
}
61+
62+
// read-unlock individual repo
63+
repoLock := val.(*sync.RWMutex)
64+
repoLock.RUnlock()
65+
66+
// decrement the global read counter after the one for the individual repo is decremented
67+
sl.globalLock.RLock()
68+
}
69+
70+
func (sl *ImageStoreLock) LockRepo(repo string) {
71+
// besides the individual repo increment the read counter for the
72+
// global lock, this will make sure the storage cannot be
73+
// write-locked at global level while individual repos are accessed
74+
// we are not using the write lock here, as that would make all repos
75+
// wait for one another
76+
sl.globalLock.RLock()
77+
78+
val, _ := sl.repoLocks.LoadOrStore(repo, &sync.RWMutex{})
79+
80+
// write-lock individual repo
81+
repoLock := val.(*sync.RWMutex)
82+
repoLock.Lock()
83+
}
84+
85+
func (sl *ImageStoreLock) UnlockRepo(repo string) {
86+
val, ok := sl.repoLocks.Load(repo)
87+
if !ok {
88+
// somehow the unlock is called for a repo that was never locked
89+
return
90+
}
91+
92+
// write-unlock individual repo
93+
repoLock := val.(*sync.RWMutex)
94+
repoLock.Unlock()
95+
96+
// decrement the global read counter after the individual repo was unlocked
97+
sl.globalLock.RLock()
98+
}

pkg/storage/scrub.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ func checkImage(
134134
) ([]ispec.Descriptor, error) {
135135
var lockLatency time.Time
136136

137-
imgStore.RLock(&lockLatency)
138-
defer imgStore.RUnlock(&lockLatency)
137+
imgStore.RLockRepo(imageName, &lockLatency)
138+
defer imgStore.RUnlockRepo(imageName, &lockLatency)
139139

140140
manifestContent, err := imgStore.GetBlobContent(imageName, manifest.Digest)
141141
if err != nil {
@@ -149,8 +149,8 @@ func checkImage(
149149
func getIndex(imageName string, imgStore storageTypes.ImageStore) ([]byte, error) {
150150
var lockLatency time.Time
151151

152-
imgStore.RLock(&lockLatency)
153-
defer imgStore.RUnlock(&lockLatency)
152+
imgStore.RLockRepo(imageName, &lockLatency)
153+
defer imgStore.RUnlockRepo(imageName, &lockLatency)
154154

155155
// check image structure / layout
156156
ok, err := imgStore.ValidateRepo(imageName)

0 commit comments

Comments
 (0)