Skip to content

Commit 3cc60db

Browse files
committed
Adding DiskCache stats
1 parent b994285 commit 3cc60db

File tree

6 files changed

+355
-98
lines changed

6 files changed

+355
-98
lines changed

go/pkg/client/client.go

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"time"
1616

1717
"errors"
18+
1819
"github.com/bazelbuild/remote-apis-sdks/go/pkg/actas"
1920
"github.com/bazelbuild/remote-apis-sdks/go/pkg/balancer"
2021
"github.com/bazelbuild/remote-apis-sdks/go/pkg/chunker"
@@ -244,6 +245,10 @@ func (c *Client) Close() error {
244245
if c.casConnection != c.connection {
245246
return c.casConnection.Close()
246247
}
248+
if c.diskCache != nil {
249+
// Waits for local disk GC to complete.
250+
c.diskCache.Shutdown()
251+
}
247252
return nil
248253
}
249254

@@ -354,21 +359,12 @@ func (o *TreeSymlinkOpts) Apply(c *Client) {
354359
}
355360

356361
type DiskCacheOpts struct {
357-
Context context.Context
358-
Path string
359-
MaxCapacityGb float64
362+
DiskCache *diskcache.DiskCache
360363
}
361364

362365
// Apply sets the client's TreeSymlinkOpts.
363366
func (o *DiskCacheOpts) Apply(c *Client) {
364-
if o.Path != "" {
365-
capBytes := uint64(o.MaxCapacityGb * 1024 * 1024 * 1024)
366-
var err error
367-
// TODO(ola): propagate errors from Apply.
368-
if c.diskCache, err = diskcache.New(o.Context, o.Path, capBytes); err != nil {
369-
log.Errorf("Error initializing disk cache on %s: %v", o.Path, err)
370-
}
371-
}
367+
c.diskCache = o.DiskCache
372368
}
373369

374370
// MaxBatchDigests is maximum amount of digests to batch in upload and download operations.

go/pkg/diskcache/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ go_test(
2828
"//go/pkg/testutil",
2929
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
3030
"@com_github_google_go_cmp//cmp:go_default_library",
31-
"@com_github_pborman_uuid//:go_default_library",
31+
"@com_github_google_uuid//:uuid",
3232
"@org_golang_x_sync//errgroup:go_default_library",
3333
],
3434
)

go/pkg/diskcache/diskcache.go

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,27 @@ type DiskCache struct {
9797
mu sync.Mutex // protects the queue.
9898
store sync.Map // map of keys to qitems.
9999
queue *priorityQueue // keys by last accessed time.
100-
sizeBytes int64 // total size.
101100
ctx context.Context
102101
shutdown chan bool
102+
shutdownOnce sync.Once
103103
gcTick uint64
104104
gcReq chan uint64
105-
testGcTicks chan uint64
105+
gcDone chan bool
106+
stats *DiskCacheStats
107+
}
108+
109+
type DiskCacheStats struct {
110+
TotalSizeBytes int64
111+
TotalNumFiles int64
112+
NumFilesStored int64
113+
TotalStoredBytes int64
114+
NumFilesGCed int64
115+
TotalGCedSizeBytes int64
116+
NumCacheHits int64
117+
NumCacheMisses int64
118+
TotalCacheHitSizeBytes int64
119+
InitTime time.Duration
120+
TotalGCTime time.Duration
106121
}
107122

108123
func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache, error) {
@@ -115,7 +130,11 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache,
115130
},
116131
gcReq: make(chan uint64, maxConcurrentRequests),
117132
shutdown: make(chan bool),
133+
gcDone: make(chan bool),
134+
stats: &DiskCacheStats{},
118135
}
136+
start := time.Now()
137+
defer func() { atomic.AddInt64((*int64)(&res.stats.InitTime), int64(time.Since(start))) }()
119138
heap.Init(res.queue)
120139
if err := os.MkdirAll(root, os.ModePerm); err != nil {
121140
return nil, err
@@ -158,7 +177,8 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache,
158177
return fmt.Errorf("error getting file size of %s: %v", path, err)
159178
}
160179
res.store.Store(k, it)
161-
atomic.AddInt64(&res.sizeBytes, size)
180+
atomic.AddInt64(&res.stats.TotalSizeBytes, size)
181+
atomic.AddInt64(&res.stats.TotalNumFiles, 1)
162182
res.mu.Lock()
163183
heap.Push(res.queue, it)
164184
res.mu.Unlock()
@@ -180,39 +200,47 @@ func (d *DiskCache) getItemSize(k key) (int64, error) {
180200
fname := d.getPath(k)
181201
info, err := os.Stat(fname)
182202
if err != nil {
183-
return 0, fmt.Errorf("Error getting info for %s: %v", fname, err)
203+
return 0, fmt.Errorf("error getting info for %s: %v", fname, err)
184204
}
185205
return info.Size(), nil
186206
}
187207

188-
// Releases resources and terminates the GC daemon. Should be the last call to the DiskCache.
208+
// Terminates the GC daemon, waiting for it to complete. No further Store* calls to the DiskCache should be made.
189209
func (d *DiskCache) Shutdown() {
190-
d.shutdown <- true
210+
d.shutdownOnce.Do(func() {
211+
d.shutdown <- true
212+
<-d.gcDone
213+
log.Infof("DiskCacheStats: %+v", d.stats)
214+
})
191215
}
192216

193-
func (d *DiskCache) TotalSizeBytes() uint64 {
194-
return uint64(atomic.LoadInt64(&d.sizeBytes))
195-
}
196-
197-
// This function is defined in https://pkg.go.dev/strings#CutSuffix
198-
// It is copy/pasted here as a hack, because I failed to upgrade the *Reclient* repo to the latest Go 1.20.7.
199-
func CutSuffix(s, suffix string) (before string, found bool) {
200-
if !strings.HasSuffix(s, suffix) {
201-
return s, false
217+
func (d *DiskCache) GetStats() *DiskCacheStats {
218+
// Return a copy for safety.
219+
return &DiskCacheStats{
220+
TotalSizeBytes: atomic.LoadInt64(&d.stats.TotalSizeBytes),
221+
TotalNumFiles: atomic.LoadInt64(&d.stats.TotalNumFiles),
222+
NumFilesStored: atomic.LoadInt64(&d.stats.NumFilesStored),
223+
TotalStoredBytes: atomic.LoadInt64(&d.stats.TotalStoredBytes),
224+
NumFilesGCed: atomic.LoadInt64(&d.stats.NumFilesGCed),
225+
TotalGCedSizeBytes: atomic.LoadInt64(&d.stats.TotalGCedSizeBytes),
226+
NumCacheHits: atomic.LoadInt64(&d.stats.NumCacheHits),
227+
NumCacheMisses: atomic.LoadInt64(&d.stats.NumCacheMisses),
228+
TotalCacheHitSizeBytes: atomic.LoadInt64(&d.stats.TotalCacheHitSizeBytes),
229+
InitTime: time.Duration(atomic.LoadInt64((*int64)(&d.stats.InitTime))),
230+
TotalGCTime: time.Duration(atomic.LoadInt64((*int64)(&d.stats.TotalGCTime))),
202231
}
203-
return s[:len(s)-len(suffix)], true
204232
}
205233

206234
func (d *DiskCache) getKeyFromFileName(fname string) (key, error) {
207235
pair := strings.Split(fname, ".")
208236
if len(pair) != 2 {
209-
return key{}, fmt.Errorf("expected file name in the form [ac_]hash/size, got %s", fname)
237+
return key{}, fmt.Errorf("expected file name in the form hash[_ac].size, got %s", fname)
210238
}
211239
size, err := strconv.ParseInt(pair[1], 10, 64)
212240
if err != nil {
213241
return key{}, fmt.Errorf("invalid size in digest %s: %s", fname, err)
214242
}
215-
hash, isAc := CutSuffix(pair[0], "ac_")
243+
hash, isAc := strings.CutSuffix(pair[0], "_ac")
216244
dg, err := digest.New(hash, size)
217245
if err != nil {
218246
return key{}, fmt.Errorf("invalid digest from file name %s: %v", fname, err)
@@ -248,7 +276,10 @@ func (d *DiskCache) StoreCas(dg digest.Digest, path string) error {
248276
if err := copyFile(path, d.getPath(it.key), dg.Size); err != nil {
249277
return err
250278
}
251-
newSize := uint64(atomic.AddInt64(&d.sizeBytes, dg.Size))
279+
newSize := uint64(atomic.AddInt64(&d.stats.TotalSizeBytes, dg.Size))
280+
atomic.AddInt64(&d.stats.TotalNumFiles, 1)
281+
atomic.AddInt64(&d.stats.NumFilesStored, 1)
282+
atomic.AddInt64(&d.stats.TotalStoredBytes, dg.Size)
252283
if newSize > d.maxCapacityBytes {
253284
select {
254285
case d.gcReq <- atomic.AddUint64(&d.gcTick, 1):
@@ -281,7 +312,10 @@ func (d *DiskCache) StoreActionCache(dg digest.Digest, ar *repb.ActionResult) er
281312
if err := os.WriteFile(d.getPath(it.key), bytes, 0644); err != nil {
282313
return err
283314
}
284-
newSize := uint64(atomic.AddInt64(&d.sizeBytes, int64(size)))
315+
newSize := uint64(atomic.AddInt64(&d.stats.TotalSizeBytes, int64(size)))
316+
atomic.AddInt64(&d.stats.TotalNumFiles, 1)
317+
atomic.AddInt64(&d.stats.NumFilesStored, 1)
318+
atomic.AddInt64(&d.stats.TotalStoredBytes, int64(size))
285319
if newSize > d.maxCapacityBytes {
286320
select {
287321
case d.gcReq <- atomic.AddUint64(&d.gcTick, 1):
@@ -292,15 +326,17 @@ func (d *DiskCache) StoreActionCache(dg digest.Digest, ar *repb.ActionResult) er
292326
}
293327

294328
func (d *DiskCache) gc() {
329+
defer func() { d.gcDone <- true }()
295330
for {
296331
select {
297332
case <-d.shutdown:
298333
return
299334
case <-d.ctx.Done():
300335
return
301-
case t := <-d.gcReq:
336+
case <-d.gcReq:
337+
start := time.Now()
302338
// Evict old entries until total size is below cap.
303-
for uint64(atomic.LoadInt64(&d.sizeBytes)) > d.maxCapacityBytes {
339+
for uint64(atomic.LoadInt64(&d.stats.TotalSizeBytes)) > d.maxCapacityBytes {
304340
d.mu.Lock()
305341
it := heap.Pop(d.queue).(*qitem)
306342
d.mu.Unlock()
@@ -309,7 +345,10 @@ func (d *DiskCache) gc() {
309345
log.Errorf("error getting item size for %v: %v", it.key, err)
310346
size = 0
311347
}
312-
atomic.AddInt64(&d.sizeBytes, -size)
348+
atomic.AddInt64(&d.stats.TotalSizeBytes, -size)
349+
atomic.AddInt64(&d.stats.TotalNumFiles, -1)
350+
atomic.AddInt64(&d.stats.TotalGCedSizeBytes, size)
351+
atomic.AddInt64(&d.stats.NumFilesGCed, 1)
313352
it.mu.Lock()
314353
// We only delete the files, and not the prefix directories, because the prefixes are not worth worrying about.
315354
if err := os.Remove(d.getPath(it.key)); err != nil {
@@ -318,12 +357,7 @@ func (d *DiskCache) gc() {
318357
d.store.Delete(it.key)
319358
it.mu.Unlock()
320359
}
321-
if d.testGcTicks != nil {
322-
select {
323-
case d.testGcTicks <- t:
324-
default:
325-
}
326-
}
360+
atomic.AddInt64((*int64)(&d.stats.TotalGCTime), int64(time.Since(start)))
327361
}
328362
}
329363
}
@@ -363,6 +397,7 @@ func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool {
363397
k := key{digest: dg, isCas: true}
364398
iUntyped, loaded := d.store.Load(k)
365399
if !loaded {
400+
atomic.AddInt64(&d.stats.NumCacheMisses, 1)
366401
return false
367402
}
368403
it := iUntyped.(*qitem)
@@ -371,52 +406,61 @@ func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool {
371406
it.mu.RUnlock()
372407
if err != nil {
373408
// It is not possible to prevent a race with GC; hence, we return false on copy errors.
409+
atomic.AddInt64(&d.stats.NumCacheMisses, 1)
374410
return false
375411
}
376412

377413
d.mu.Lock()
378414
d.queue.Bump(it)
379415
d.mu.Unlock()
416+
atomic.AddInt64(&d.stats.NumCacheHits, 1)
417+
atomic.AddInt64(&d.stats.TotalCacheHitSizeBytes, dg.Size)
380418
return true
381419
}
382420

383421
func (d *DiskCache) LoadActionCache(dg digest.Digest) (ar *repb.ActionResult, loaded bool) {
384422
k := key{digest: dg, isCas: false}
385423
iUntyped, loaded := d.store.Load(k)
386424
if !loaded {
425+
atomic.AddInt64(&d.stats.NumCacheMisses, 1)
387426
return nil, false
388427
}
389428
it := iUntyped.(*qitem)
390429
it.mu.RLock()
391430
ar = &repb.ActionResult{}
392-
if err := d.loadActionResult(k, ar); err != nil {
431+
size, err := d.loadActionResult(k, ar)
432+
if err != nil {
393433
// It is not possible to prevent a race with GC; hence, we return false on load errors.
394434
it.mu.RUnlock()
435+
atomic.AddInt64(&d.stats.NumCacheMisses, 1)
395436
return nil, false
396437
}
397438
it.mu.RUnlock()
398439

399440
d.mu.Lock()
400441
d.queue.Bump(it)
401442
d.mu.Unlock()
443+
atomic.AddInt64(&d.stats.NumCacheHits, 1)
444+
atomic.AddInt64(&d.stats.TotalCacheHitSizeBytes, int64(size))
402445
return ar, true
403446
}
404447

405-
func (d *DiskCache) loadActionResult(k key, ar *repb.ActionResult) error {
448+
func (d *DiskCache) loadActionResult(k key, ar *repb.ActionResult) (int, error) {
406449
bytes, err := os.ReadFile(d.getPath(k))
407450
if err != nil {
408-
return err
451+
return 0, err
409452
}
453+
n := len(bytes)
410454
// Required sanity check: sometimes the read pretends to succeed, but doesn't, if
411455
// the file is being concurrently deleted. Empty ActionResult is advised against in
412456
// the RE-API: https://github.com/bazelbuild/remote-apis/blob/main/build/bazel/remote/execution/v2/remote_execution.proto#L1052
413-
if len(bytes) == 0 {
414-
return fmt.Errorf("read empty ActionResult for %v", k.digest)
457+
if n == 0 {
458+
return n, fmt.Errorf("read empty ActionResult for %v", k.digest)
415459
}
416460
if err := proto.Unmarshal(bytes, ar); err != nil {
417-
return fmt.Errorf("error unmarshalling %v as ActionResult: %v", bytes, err)
461+
return n, fmt.Errorf("error unmarshalling %v as ActionResult: %v", bytes, err)
418462
}
419-
return nil
463+
return n, nil
420464
}
421465

422466
func getLastAccessTime(path string) (time.Time, error) {

0 commit comments

Comments
 (0)