Skip to content

Commit b01e309

Browse files
committed
Cherrypicking Marc-Antoine's comments
1 parent 2677570 commit b01e309

File tree

6 files changed

+86
-65
lines changed

6 files changed

+86
-65
lines changed

go/pkg/diskcache/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ go_library(
1515
"@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
1616
"@com_github_golang_glog//:go_default_library",
1717
"@org_golang_google_protobuf//proto:go_default_library",
18+
"@org_golang_x_sync//errgroup:go_default_library",
1819
],
1920
)
2021

go/pkg/diskcache/diskcache.go

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ import (
1616
"time"
1717

1818
"github.com/bazelbuild/remote-apis-sdks/go/pkg/digest"
19+
"golang.org/x/sync/errgroup"
20+
"google.golang.org/protobuf/proto"
21+
1922
repb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
2023
log "github.com/golang/glog"
21-
"google.golang.org/protobuf/proto"
2224
)
2325

2426
type key struct {
@@ -103,7 +105,7 @@ type DiskCache struct {
103105
testGcTicks chan uint64
104106
}
105107

106-
func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache {
108+
func New(ctx context.Context, root string, maxCapacityBytes uint64) (*DiskCache, error) {
107109
res := &DiskCache{
108110
root: root,
109111
maxCapacityBytes: maxCapacityBytes,
@@ -115,44 +117,45 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache {
115117
shutdown: make(chan bool),
116118
}
117119
heap.Init(res.queue)
118-
_ = os.MkdirAll(root, os.ModePerm)
120+
if err := os.MkdirAll(root, os.ModePerm); err != nil {
121+
return nil, err
122+
}
119123
// We use Git's directory/file naming structure as inspiration:
120124
// https://git-scm.com/book/en/v2/Git-Internals-Git-Objects#:~:text=The%20subdirectory%20is%20named%20with%20the%20first%202%20characters%20of%20the%20SHA%2D1%2C%20and%20the%20filename%20is%20the%20remaining%2038%20characters.
121-
var wg sync.WaitGroup
122-
wg.Add(256)
125+
eg, eCtx := errgroup.WithContext(ctx)
123126
for i := 0; i < 256; i++ {
124127
prefixDir := filepath.Join(root, fmt.Sprintf("%02x", i))
125-
go func() {
126-
defer wg.Done()
127-
_ = os.MkdirAll(prefixDir, os.ModePerm)
128-
_ = filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error {
128+
eg.Go(func() error {
129+
if eCtx.Err() != nil {
130+
return eCtx.Err()
131+
}
132+
if err := os.MkdirAll(prefixDir, os.ModePerm); err != nil {
133+
return err
134+
}
135+
return filepath.WalkDir(prefixDir, func(path string, d fs.DirEntry, err error) error {
129136
// We log and continue on all errors, because cache read errors are not critical.
130137
if err != nil {
131-
log.Errorf("Error reading cache directory: %v", err)
132-
return nil
138+
return fmt.Errorf("error reading cache directory: %v", err)
133139
}
134140
if d.IsDir() {
135141
return nil
136142
}
137143
subdir := filepath.Base(filepath.Dir(path))
138144
k, err := res.getKeyFromFileName(subdir + d.Name())
139145
if err != nil {
140-
log.Errorf("Error parsing cached file name %s: %v", path, err)
141-
return nil
146+
return fmt.Errorf("error parsing cached file name %s: %v", path, err)
142147
}
143-
atime, err := GetLastAccessTime(path)
148+
atime, err := getLastAccessTime(path)
144149
if err != nil {
145-
log.Errorf("Error getting last accessed time of %s: %v", path, err)
146-
return nil
150+
return fmt.Errorf("error getting last accessed time of %s: %v", path, err)
147151
}
148152
it := &qitem{
149153
key: k,
150154
lat: atime,
151155
}
152156
size, err := res.getItemSize(k)
153157
if err != nil {
154-
log.Errorf("Error getting file size of %s: %v", path, err)
155-
return nil
158+
return fmt.Errorf("error getting file size of %s: %v", path, err)
156159
}
157160
res.store.Store(k, it)
158161
atomic.AddInt64(&res.sizeBytes, size)
@@ -161,11 +164,13 @@ func New(ctx context.Context, root string, maxCapacityBytes uint64) *DiskCache {
161164
res.mu.Unlock()
162165
return nil
163166
})
164-
}()
167+
})
168+
}
169+
if err := eg.Wait(); err != nil {
170+
return nil, err
165171
}
166-
wg.Wait()
167172
go res.gc()
168-
return res
173+
return res, nil
169174
}
170175

171176
func (d *DiskCache) getItemSize(k key) (int64, error) {
@@ -342,18 +347,13 @@ func copyFile(src, dst string, size int64) error {
342347
return err
343348
}
344349
defer out.Close()
345-
_, err = io.Copy(out, in)
350+
n, err := io.Copy(out, in)
346351
if err != nil {
347352
return err
348353
}
349-
// Required sanity check: sometimes the copy pretends to succeed, but doesn't, if
350-
// the file is being concurrently deleted.
351-
dstInfo, err := os.Stat(dst)
352-
if err != nil {
353-
return err
354-
}
355-
if dstInfo.Size() != size {
356-
return fmt.Errorf("copy of %s to %s failed: src/dst size mismatch: wanted %d, got %d", src, dst, size, dstInfo.Size())
354+
// Required sanity check: if the file is being concurrently deleted, we may not always copy everything.
355+
if n != size {
356+
return fmt.Errorf("copy of %s to %s failed: src/dst size mismatch: wanted %d, got %d", src, dst, size, n)
357357
}
358358
return nil
359359
}
@@ -367,12 +367,12 @@ func (d *DiskCache) LoadCas(dg digest.Digest, path string) bool {
367367
}
368368
it := iUntyped.(*qitem)
369369
it.mu.RLock()
370-
if err := copyFile(d.getPath(k), path, dg.Size); err != nil {
370+
err := copyFile(d.getPath(k), path, dg.Size)
371+
it.mu.RUnlock()
372+
if err != nil {
371373
// It is not possible to prevent a race with GC; hence, we return false on copy errors.
372-
it.mu.RUnlock()
373374
return false
374375
}
375-
it.mu.RUnlock()
376376

377377
d.mu.Lock()
378378
d.queue.Bump(it)
@@ -418,3 +418,11 @@ func (d *DiskCache) loadActionResult(k key, ar *repb.ActionResult) error {
418418
}
419419
return nil
420420
}
421+
422+
func getLastAccessTime(path string) (time.Time, error) {
423+
info, err := os.Stat(path)
424+
if err != nil {
425+
return time.Time{}, err
426+
}
427+
return FileInfoToAccessTime(info), nil
428+
}

go/pkg/diskcache/diskcache_test.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ func TestStoreLoadCasPerm(t *testing.T) {
4545
for _, tc := range tests {
4646
t.Run(tc.name, func(t *testing.T) {
4747
root := t.TempDir()
48-
d := New(context.Background(), filepath.Join(root, "cache"), 20)
48+
d, err := New(context.Background(), filepath.Join(root, "cache"), 20)
49+
if err != nil {
50+
t.Errorf("New: %v", err)
51+
}
4952
defer d.Shutdown()
5053
fname, _ := testutil.CreateFile(t, tc.executable, "12345")
5154
srcInfo, err := os.Stat(fname)
@@ -83,7 +86,10 @@ func TestStoreLoadCasPerm(t *testing.T) {
8386

8487
func TestLoadCasNotFound(t *testing.T) {
8588
root := t.TempDir()
86-
d := New(context.Background(), filepath.Join(root, "cache"), 20)
89+
d, err := New(context.Background(), filepath.Join(root, "cache"), 20)
90+
if err != nil {
91+
t.Errorf("New: %v", err)
92+
}
8793
defer d.Shutdown()
8894
newName := filepath.Join(root, "new")
8995
dg := digest.NewFromBlob([]byte("bla"))
@@ -94,7 +100,10 @@ func TestLoadCasNotFound(t *testing.T) {
94100

95101
func TestStoreLoadActionCache(t *testing.T) {
96102
root := t.TempDir()
97-
d := New(context.Background(), filepath.Join(root, "cache"), 100)
103+
d, err := New(context.Background(), filepath.Join(root, "cache"), 100)
104+
if err != nil {
105+
t.Errorf("New: %v", err)
106+
}
98107
defer d.Shutdown()
99108
ar := &repb.ActionResult{
100109
OutputFiles: []*repb.OutputFile{
@@ -116,7 +125,10 @@ func TestStoreLoadActionCache(t *testing.T) {
116125

117126
func TestGcOldestCas(t *testing.T) {
118127
root := t.TempDir()
119-
d := New(context.Background(), filepath.Join(root, "cache"), 20)
128+
d, err := New(context.Background(), filepath.Join(root, "cache"), 20)
129+
if err != nil {
130+
t.Errorf("New: %v", err)
131+
}
120132
defer d.Shutdown()
121133
d.testGcTicks = make(chan uint64, 1)
122134
for i := 0; i < 5; i++ {
@@ -154,7 +166,10 @@ func TestGcOldestActionCache(t *testing.T) {
154166
}
155167
size := len(bytes)
156168
root := t.TempDir()
157-
d := New(context.Background(), filepath.Join(root, "cache"), uint64(size)*4)
169+
d, err := New(context.Background(), filepath.Join(root, "cache"), uint64(size)*4)
170+
if err != nil {
171+
t.Errorf("New: %v", err)
172+
}
158173
defer d.Shutdown()
159174
d.testGcTicks = make(chan uint64, 1)
160175
for i := 0; i < 5; i++ {
@@ -192,11 +207,11 @@ func TestGcOldestActionCache(t *testing.T) {
192207
func isSystemLastAccessTimeAccurate(t *testing.T) bool {
193208
t.Helper()
194209
fname, _ := testutil.CreateFile(t, false, "foo")
195-
lat, _ := GetLastAccessTime(fname)
210+
lat, _ := getLastAccessTime(fname)
196211
if _, err := os.ReadFile(fname); err != nil {
197212
t.Fatalf("%v", err)
198213
}
199-
newLat, _ := GetLastAccessTime(fname)
214+
newLat, _ := getLastAccessTime(fname)
200215
return lat.Before(newLat)
201216
}
202217

@@ -209,7 +224,10 @@ func TestInitFromExistingCas(t *testing.T) {
209224
return
210225
}
211226
root := t.TempDir()
212-
d := New(context.Background(), filepath.Join(root, "cache"), 20)
227+
d, err := New(context.Background(), filepath.Join(root, "cache"), 20)
228+
if err != nil {
229+
t.Errorf("New: %v", err)
230+
}
213231
for i := 0; i < 4; i++ {
214232
fname, _ := testutil.CreateFile(t, false, fmt.Sprintf("aaa %d", i))
215233
dg, err := digest.NewFromFile(fname)
@@ -228,7 +246,10 @@ func TestInitFromExistingCas(t *testing.T) {
228246
d.Shutdown()
229247

230248
// Re-initialize from existing files.
231-
d = New(context.Background(), filepath.Join(root, "cache"), 20)
249+
d, err = New(context.Background(), filepath.Join(root, "cache"), 20)
250+
if err != nil {
251+
t.Errorf("New: %v", err)
252+
}
232253
defer d.Shutdown()
233254
d.testGcTicks = make(chan uint64, 1)
234255

@@ -238,7 +259,7 @@ func TestInitFromExistingCas(t *testing.T) {
238259
t.Errorf("expected %s to be cached", dg)
239260
}
240261
fname, _ := testutil.CreateFile(t, false, "aaa 4")
241-
dg, err := digest.NewFromFile(fname)
262+
dg, err = digest.NewFromFile(fname)
242263
if err != nil {
243264
t.Fatalf("digest.NewFromFile failed: %v", err)
244265
}
@@ -267,7 +288,10 @@ func TestThreadSafetyCas(t *testing.T) {
267288
nFiles := 10
268289
attempts := 5000
269290
// All blobs are size 5 exactly. We will have half the byte capacity we need.
270-
d := New(context.Background(), filepath.Join(root, "cache"), uint64(nFiles*5)/2)
291+
d, err := New(context.Background(), filepath.Join(root, "cache"), uint64(nFiles*5)/2)
292+
if err != nil {
293+
t.Errorf("New: %v", err)
294+
}
271295
d.testGcTicks = make(chan uint64, attempts)
272296
defer d.Shutdown()
273297
var files []string

go/pkg/diskcache/sys_darwin.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,11 @@
33
package diskcache
44

55
import (
6-
"os"
6+
"io/fs"
77
"syscall"
88
"time"
99
)
1010

11-
func GetLastAccessTime(path string) (time.Time, error) {
12-
info, err := os.Stat(path)
13-
if err != nil {
14-
return time.Time{}, err
15-
}
16-
return time.Unix(info.Sys().(*syscall.Stat_t).Atimespec.Unix()), nil
11+
func FileInfoToAccessTime(info fs.FileInfo) time.Time {
12+
return time.Unix(info.Sys().(*syscall.Stat_t).Atimespec.Unix())
1713
}

go/pkg/diskcache/sys_linux.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,11 @@
22
package diskcache
33

44
import (
5-
"os"
5+
"io/fs"
66
"syscall"
77
"time"
88
)
99

10-
func GetLastAccessTime(path string) (time.Time, error) {
11-
info, err := os.Stat(path)
12-
if err != nil {
13-
return time.Time{}, err
14-
}
15-
return time.Unix(info.Sys().(*syscall.Stat_t).Atim.Unix()), nil
10+
func FileInfoToAccessTime(info fs.FileInfo) time.Time {
11+
return time.Unix(info.Sys().(*syscall.Stat_t).Atim.Unix())
1612
}

go/pkg/diskcache/sys_windows.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,13 @@
22
package diskcache
33

44
import (
5-
"os"
5+
"io/fs"
66
"syscall"
77
"time"
88
)
99

1010
// This will return correct values only if `fsutil behavior set disablelastaccess 0` is set.
1111
// Tracking of last access time is disabled by default on Windows.
12-
func GetLastAccessTime(path string) (time.Time, error) {
13-
info, err := os.Stat(path)
14-
if err != nil {
15-
return time.Time{}, err
16-
}
17-
return time.Unix(0, info.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds()), nil
12+
func FileInfoToAccessTime(info fs.FileInfo) time.Time {
13+
return time.Unix(0, info.Sys().(*syscall.Win32FileAttributeData).LastAccessTime.Nanoseconds())
1814
}

0 commit comments

Comments
 (0)