Skip to content

Commit 96d358d

Browse files
Add filepool files with backing content
Filepool files with backing content allows the use of read only datasources as initial content for a filepool backed file. It is implemented with efficient copy on write semantics which will copy any sector from the backing store upon writes.
1 parent 0de5269 commit 96d358d

24 files changed

+897
-76
lines changed

cmd/bb_noop_worker/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ go_library(
99
deps = [
1010
"//pkg/blobstore",
1111
"//pkg/builder",
12-
"//pkg/filesystem",
1312
"//pkg/filesystem/filepool",
1413
"//pkg/proto/configuration/bb_noop_worker",
1514
"//pkg/proto/remoteworker",

cmd/bb_worker/BUILD.bazel

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ go_library(
1616
"//pkg/cas",
1717
"//pkg/cleaner",
1818
"//pkg/clock",
19-
"//pkg/filesystem",
2019
"//pkg/filesystem/filepool",
2120
"//pkg/filesystem/virtual",
2221
"//pkg/filesystem/virtual/configuration",

internal/mock/BUILD.bazel

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,8 @@ gomock(
184184
name = "filesystem_filepool",
185185
out = "filesystem_filepool.go",
186186
interfaces = [
187-
"FilePool"
187+
"FilePool",
188+
"ReaderAt",
188189
],
189190
library = "//pkg/filesystem/filepool",
190191
mockgen_model_library = "@org_uber_go_mock//mockgen/model",

pkg/builder/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ go_library(
3636
"//pkg/cas",
3737
"//pkg/cleaner",
3838
"//pkg/clock",
39-
"//pkg/filesystem",
4039
"//pkg/filesystem/access",
4140
"//pkg/filesystem/filepool",
4241
"//pkg/filesystem/virtual",
@@ -105,7 +104,6 @@ go_test(
105104
"//internal/mock",
106105
"//pkg/cleaner",
107106
"//pkg/clock",
108-
"//pkg/filesystem",
109107
"//pkg/filesystem/access",
110108
"//pkg/filesystem/filepool",
111109
"//pkg/proto/cas",

pkg/builder/file_pool_stats_build_executor.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ type statsCollectingFilePool struct {
5757
totalFiles uint64
5858
}
5959

60-
func (fp *statsCollectingFilePool) NewFile() (filesystem.FileReadWriter, error) {
61-
f, err := fp.base.NewFile()
60+
func (fp *statsCollectingFilePool) NewFile(sparseReaderAt filepool.SparseReaderAt, size uint64) (filesystem.FileReadWriter, error) {
61+
f, err := fp.base.NewFile(sparseReaderAt, size)
6262
if err != nil {
6363
return nil, err
6464
}

pkg/builder/file_pool_stats_build_executor_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ func TestFilePoolStatsBuildExecutorExample(t *testing.T) {
4444
digest.MustNewFunction("hello", remoteexecution.DigestFunction_MD5),
4545
request,
4646
gomock.Any()).DoAndReturn(func(ctx context.Context, filePool filepool.FilePool, monitor access.UnreadDirectoryMonitor, digestFunction digest.Function, request *remoteworker.DesiredState_Executing, executionStateUpdates chan<- *remoteworker.CurrentState_Executing) *remoteexecution.ExecuteResponse {
47-
f, err := filePool.NewFile()
47+
f, err := filePool.NewFile(nil, 0)
4848
require.NoError(t, err)
4949
require.NoError(t, f.Truncate(5))
5050
require.NoError(t, f.Close())
5151

52-
f, err = filePool.NewFile()
52+
f, err = filePool.NewFile(nil, 0)
5353
require.NoError(t, err)
5454
n, err := f.WriteAt([]byte("Hello"), 100)
5555
require.Equal(t, 5, n)
@@ -75,10 +75,10 @@ func TestFilePoolStatsBuildExecutorExample(t *testing.T) {
7575
file1 := mock.NewMockFileReadWriter(ctrl)
7676
file2 := mock.NewMockFileReadWriter(ctrl)
7777

78-
filePool.EXPECT().NewFile().Return(file1, nil)
78+
filePool.EXPECT().NewFile(nil, uint64(0)).Return(file1, nil)
7979
file1.EXPECT().Truncate(int64(5)).Return(nil)
8080
file1.EXPECT().Close().Return(nil)
81-
filePool.EXPECT().NewFile().Return(file2, nil)
81+
filePool.EXPECT().NewFile(nil, uint64(0)).Return(file2, nil)
8282
file2.EXPECT().WriteAt([]byte("Hello"), int64(100)).Return(5, nil)
8383
file2.EXPECT().ReadAt(gomock.Any(), int64(98)).DoAndReturn(func(p []byte, offset int64) (int, error) {
8484
copy(p, []byte("\x00\x00Hello\x00\x00\x00"))

pkg/filesystem/BUILD.bazel

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,9 @@ go_library(
1010
importpath = "github.com/buildbarn/bb-remote-execution/pkg/filesystem",
1111
visibility = ["//visibility:public"],
1212
deps = [
13-
"//pkg/proto/configuration/filesystem",
14-
"@com_github_buildbarn_bb_storage//pkg/blockdevice",
1513
"@com_github_buildbarn_bb_storage//pkg/filesystem",
1614
"@com_github_buildbarn_bb_storage//pkg/filesystem/path",
1715
"@com_github_buildbarn_bb_storage//pkg/util",
18-
"@com_github_prometheus_client_golang//prometheus",
1916
"@org_golang_google_grpc//codes",
2017
"@org_golang_google_grpc//status",
2118
],

pkg/filesystem/filepool/BUILD.bazel

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ go_library(
99
"file_pool.go",
1010
"metrics_file_pool.go",
1111
"quota_enforcing_file_pool.go",
12+
"simple_sparse_reader_at.go",
13+
"sparse_reader_at.go",
14+
"truncatable_sparse_reader_at.go",
1215
],
1316
importpath = "github.com/buildbarn/bb-remote-execution/pkg/filesystem/filepool",
1417
visibility = ["//visibility:public"],
@@ -17,7 +20,6 @@ go_library(
1720
"//pkg/proto/configuration/filesystem",
1821
"@com_github_buildbarn_bb_storage//pkg/blockdevice",
1922
"@com_github_buildbarn_bb_storage//pkg/filesystem",
20-
"@com_github_buildbarn_bb_storage//pkg/filesystem/path",
2123
"@com_github_buildbarn_bb_storage//pkg/util",
2224
"@com_github_prometheus_client_golang//prometheus",
2325
"@org_golang_google_grpc//codes",
@@ -31,13 +33,13 @@ go_test(
3133
"block_device_backed_file_pool_test.go",
3234
"empty_file_pool_test.go",
3335
"quota_enforcing_file_pool_test.go",
36+
"simple_sparse_reader_at_test.go",
37+
"truncatable_sparse_reader_at_test.go",
3438
],
3539
deps = [
3640
":filepool",
3741
"//internal/mock",
38-
"//pkg/filesystem",
3942
"@com_github_buildbarn_bb_storage//pkg/filesystem",
40-
"@com_github_buildbarn_bb_storage//pkg/filesystem/path",
4143
"@com_github_buildbarn_bb_storage//pkg/testutil",
4244
"@com_github_stretchr_testify//require",
4345
"@org_golang_google_grpc//codes",

pkg/filesystem/filepool/block_device_backed_file_pool.go

Lines changed: 125 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package filepool
33
import (
44
"fmt"
55
"io"
6+
"strings"
67

78
re_filesystem "github.com/buildbarn/bb-remote-execution/pkg/filesystem"
89
"github.com/buildbarn/bb-storage/pkg/blockdevice"
@@ -33,16 +34,31 @@ func NewBlockDeviceBackedFilePool(blockDevice blockdevice.BlockDevice, sectorAll
3334
}
3435
}
3536

36-
func (fp *blockDeviceBackedFilePool) NewFile() (filesystem.FileReadWriter, error) {
37-
return &blockDeviceBackedFile{
38-
fp: fp,
39-
}, nil
37+
func (fp *blockDeviceBackedFilePool) NewFile(sparseReaderAt SparseReaderAt, initialSize uint64) (filesystem.FileReadWriter, error) {
38+
var err error
39+
if sparseReaderAt == nil {
40+
if initialSize != 0 {
41+
return nil, status.Errorf(codes.InvalidArgument, "initial size must be zero when sparseReaderAt is nil")
42+
}
43+
if sparseReaderAt, err = NewSimpleSparseReaderAt(strings.NewReader(""), nil, 0); err != nil {
44+
return nil, status.Errorf(codes.Internal, "failed to create empty SparseReaderAt: %v", err)
45+
}
46+
}
47+
fr := &blockDeviceBackedFile{
48+
fp: fp,
49+
underlying: NewTruncatableSparseReaderAt(sparseReaderAt, int64(initialSize)),
50+
}
51+
if err = fr.Truncate(int64(initialSize)); err != nil {
52+
return nil, status.Errorf(codes.Internal, "failed to truncate file to initial size: %v", err)
53+
}
54+
return fr, nil
4055
}
4156

4257
type blockDeviceBackedFile struct {
43-
fp *blockDeviceBackedFilePool
44-
sizeBytes uint64
45-
sectors []uint32
58+
fp *blockDeviceBackedFilePool
59+
underlying TruncatableSparseReaderAt
60+
sizeBytes uint64
61+
sectors []uint32
4662
}
4763

4864
func (f *blockDeviceBackedFile) Close() error {
@@ -115,7 +131,7 @@ func (f *blockDeviceBackedFile) limitBufferToSectorBoundary(p []byte, sectorCoun
115131
return p
116132
}
117133

118-
func (f *blockDeviceBackedFile) GetNextRegionOffset(off int64, regionType filesystem.RegionType) (int64, error) {
134+
func (f *blockDeviceBackedFile) getNextRegionOffsetForOverlay(off int64, regionType filesystem.RegionType) (int64, error) {
119135
// Short circuit calls that are out of bounds.
120136
if off < 0 {
121137
return 0, status.Errorf(codes.InvalidArgument, "Negative seek offset: %d", off)
@@ -165,30 +181,95 @@ func (f *blockDeviceBackedFile) GetNextRegionOffset(off int64, regionType filesy
165181
}
166182
}
167183

184+
func (f *blockDeviceBackedFile) GetNextRegionOffset(off int64, regionType filesystem.RegionType) (int64, error) {
185+
// Short circuit calls that are out of bounds.
186+
if off < 0 {
187+
return 0, status.Errorf(codes.InvalidArgument, "Negative seek offset: %d", off)
188+
}
189+
if uint64(off) >= f.sizeBytes {
190+
return 0, io.EOF
191+
}
192+
193+
// Data is represented by the existence of a written sector in
194+
// either the overlay or the underlying file. Holes are represented
195+
// by the absence of a written sector in the overlay _and_ a hole in
196+
// the underlying file.
197+
//
198+
// For data this is the lowest valued offset of the two candidates.
199+
// For holes it's the first position which both sources agree upon
200+
// are holes.
201+
switch regionType {
202+
case filesystem.Data:
203+
data1, err := f.underlying.GetNextRegionOffset(off, filesystem.Data)
204+
if err == io.EOF {
205+
// No more data in the underlying file. Return the result
206+
// from the overlay.
207+
return f.getNextRegionOffsetForOverlay(off, filesystem.Data)
208+
}
209+
if err != nil {
210+
return data1, status.Errorf(codes.Internal, "unexpected error while searching for data in underlying file: %v", err)
211+
}
212+
data2, err := f.getNextRegionOffsetForOverlay(off, filesystem.Data)
213+
if err == io.EOF {
214+
// No more data in the overlay, return the data from the
215+
// underlying file.
216+
return data1, nil
217+
}
218+
if err != nil {
219+
return data2, status.Errorf(codes.Internal, "unexpected error while searching for data in underlying file: %v", err)
220+
}
221+
if data1 < data2 {
222+
return data1, nil
223+
}
224+
return data2, nil
225+
case filesystem.Hole:
226+
for {
227+
// Since we have already ruled out that we are past the EOF
228+
// boundary no calls to GetNextRegionOffset should be
229+
// capable of returning holes.
230+
hole1, err := f.underlying.GetNextRegionOffset(off, filesystem.Hole)
231+
if err != nil {
232+
return hole1, status.Errorf(codes.Internal, "unexpected error while searching for hole in underlying file: %v", err)
233+
}
234+
hole2, err := f.getNextRegionOffsetForOverlay(off, filesystem.Hole)
235+
if err != nil {
236+
return hole2, status.Errorf(codes.Internal, "unexpected error while searching for hole in overlay file: %v", err)
237+
}
238+
if hole1 == hole2 {
239+
// Both sources agree that it's a hole.
240+
return hole1, nil
241+
}
242+
if hole1 == int64(f.sizeBytes) || hole2 == int64(f.sizeBytes) {
243+
// The only possible hole is the implicit hole at the
244+
// end of the file.
245+
return int64(f.sizeBytes), nil
246+
}
247+
// Continue searching at the next possible offset.
248+
off = max(hole1, hole2)
249+
}
250+
default:
251+
panic("Unknown region type")
252+
}
253+
}
254+
168255
// readFromSectors performs a single read against the block device. It
169256
// attempts to read as much data into the output buffer as is possible
170257
// in a single read operation. If the file is fragmented, multiple reads
171258
// are necessary, requiring this function to be called repeatedly.
172259
func (f *blockDeviceBackedFile) readFromSectors(p []byte, sectorIndex, lastSectorIndex, offsetWithinSector int) (int, error) {
173260
if sectorIndex >= len(f.sectors) {
174261
// Attempted to read from a hole located at the
175-
// end of the file. Fill up all of the remaining
176-
// space with zero bytes.
177-
for i := 0; i < len(p); i++ {
178-
p[i] = 0
179-
}
180-
return len(p), nil
262+
// end of the file. Delegate to ReadLayer.
263+
offset := f.fp.sectorSizeBytes*sectorIndex + offsetWithinSector
264+
return f.underlying.ReadAt(p, int64(offset))
181265
}
182266

183267
sector, sectorsToRead := f.getSectorsContiguous(sectorIndex, lastSectorIndex)
184268
p = f.limitBufferToSectorBoundary(p, sectorsToRead, offsetWithinSector)
185269
if sector == 0 {
186270
// Attempted to read from a sparse region of the file.
187-
// Fill in zero bytes.
188-
for i := 0; i < len(p); i++ {
189-
p[i] = 0
190-
}
191-
return len(p), nil
271+
offset := f.fp.sectorSizeBytes*sectorIndex + offsetWithinSector
272+
return f.underlying.ReadAt(p, int64(offset))
192273
}
193274

194275
// Attempted to read from a region of the file that contains
@@ -267,6 +348,9 @@ func (f *blockDeviceBackedFile) Truncate(size int64) error {
267348
if size < 0 {
268349
return status.Errorf(codes.InvalidArgument, "Negative truncation size: %d", size)
269350
}
351+
if err := f.underlying.Truncate(size); err != nil {
352+
return status.Errorf(codes.Internal, "truncating the underlying file failed: %v", err)
353+
}
270354

271355
sectorIndex := int(size / int64(f.fp.sectorSizeBytes))
272356
offsetWithinSector := int(size % int64(f.fp.sectorSizeBytes))
@@ -299,7 +383,7 @@ func (f *blockDeviceBackedFile) Truncate(size int64) error {
299383
// writeToNewSectors is used to write data into new sectors. This
300384
// function is called when holes in a sparse file are filled up or when
301385
// data is appended to the end of a file.
302-
func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, offsetWithinSector int) (int, uint32, int, error) {
386+
func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, fromSector, offsetWithinSector int) (int, uint32, int, error) {
303387
// Allocate space to store the data.
304388
sectorsToAllocate := int((uint64(offsetWithinSector) + uint64(len(p)) + uint64(f.fp.sectorSizeBytes) - 1) / uint64(f.fp.sectorSizeBytes))
305389
firstSector, sectorsAllocated, err := f.fp.sectorAllocator.AllocateContiguous(sectorsToAllocate)
@@ -314,10 +398,15 @@ func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, offsetWithinSector i
314398
nWritten := len(p)
315399

316400
// Write the first sector separately when we need to introduce
317-
// leading zero padding.
401+
// leading read layer padding.
318402
sector := firstSector
319403
if offsetWithinSector > 0 {
320404
buf := make([]byte, f.fp.sectorSizeBytes)
405+
logicalOffset := fromSector * f.fp.sectorSizeBytes
406+
if _, err := f.underlying.ReadAt(buf[:offsetWithinSector], int64(logicalOffset)); err != nil {
407+
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
408+
return 0, 0, 0, err
409+
}
321410
nWritten := copy(buf[offsetWithinSector:], p)
322411
if _, err := f.fp.blockDevice.WriteAt(buf, f.toDeviceOffset(sector, 0)); err != nil {
323412
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
@@ -340,9 +429,14 @@ func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, offsetWithinSector i
340429
}
341430

342431
// Write the last sector separately when we need to introduce
343-
// trailing zero padding.
432+
// trailing read layer padding.
344433
if len(p) > 0 {
345434
buf := make([]byte, f.fp.sectorSizeBytes)
435+
logicalOffset := uint32(len(p)) + (sector-firstSector)*uint32(f.fp.sectorSizeBytes)
436+
if _, err := f.underlying.ReadAt(buf[len(p):], int64(logicalOffset)); err != nil {
437+
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
438+
return 0, 0, 0, err
439+
}
346440
copy(buf, p)
347441
if _, err := f.fp.blockDevice.WriteAt(buf, f.toDeviceOffset(sector, 0)); err != nil {
348442
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
@@ -375,7 +469,7 @@ func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSector
375469
// Attempted to write past the end-of-file or within a
376470
// hole located at the end of a sparse file. Allocate
377471
// space and grow the file.
378-
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, offsetWithinSector)
472+
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, sectorIndex, offsetWithinSector)
379473
if err != nil {
380474
return 0, err
381475
}
@@ -389,7 +483,7 @@ func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSector
389483
if sector == 0 {
390484
// Attempted to write to a hole within a sparse file.
391485
// Allocate space and insert sectors into the file.
392-
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, offsetWithinSector)
486+
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, sectorIndex, offsetWithinSector)
393487
if err != nil {
394488
return 0, err
395489
}
@@ -409,6 +503,13 @@ func (f *blockDeviceBackedFile) WriteAt(p []byte, off int64) (int, error) {
409503
if len(p) == 0 {
410504
return 0, nil
411505
}
506+
// Truncate the file to a larger size if needed to accomodate the
507+
// read.
508+
if f.sizeBytes < uint64(off)+uint64(len(p)) {
509+
if err := f.Truncate(off + int64(len(p))); err != nil {
510+
return 0, err
511+
}
512+
}
412513

413514
// As the file may be stored on disk non-contiguously or may be
414515
// a sparse file with holes, the write may need to be decomposed

0 commit comments

Comments
 (0)