Skip to content

Commit 33bdd88

Browse files
Add filepool files with backing content
Filepool files with backing content allows the use of read only datasources as initial content for a filepool backed file. It is implemented with efficient copy on write semantics which will copy any sector from the backing store upon writes.
1 parent 0de5269 commit 33bdd88

10 files changed

+303
-18
lines changed

pkg/builder/file_pool_stats_build_executor.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/buildbarn/bb-storage/pkg/filesystem"
1414
"github.com/buildbarn/bb-storage/pkg/util"
1515

16+
"google.golang.org/grpc/codes"
17+
"google.golang.org/grpc/status"
1618
"google.golang.org/protobuf/types/known/anypb"
1719
)
1820

@@ -77,6 +79,10 @@ func (fp *statsCollectingFilePool) NewFile() (filesystem.FileReadWriter, error)
7779
}, nil
7880
}
7981

82+
func (fp *statsCollectingFilePool) NewFileWithReadLayer(readLayer filepool.ReadLayer, sizeBytes uint64) (filesystem.FileReadWriter, error) {
83+
return nil, status.Error(codes.Internal, "Not implemented")
84+
}
85+
8086
// statsCollectingFileReadWriter is a decorator for
8187
// filesystem.FileReadWriter that measures the number of file operations
8288
// performed.

pkg/filesystem/filepool/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ go_library(
99
"file_pool.go",
1010
"metrics_file_pool.go",
1111
"quota_enforcing_file_pool.go",
12+
"read_layer.go",
1213
],
1314
importpath = "github.com/buildbarn/bb-remote-execution/pkg/filesystem/filepool",
1415
visibility = ["//visibility:public"],
@@ -31,6 +32,7 @@ go_test(
3132
"block_device_backed_file_pool_test.go",
3233
"empty_file_pool_test.go",
3334
"quota_enforcing_file_pool_test.go",
35+
"read_layer_test.go",
3436
],
3537
deps = [
3638
":filepool",

pkg/filesystem/filepool/block_device_backed_file_pool.go

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,35 @@ func NewBlockDeviceBackedFilePool(blockDevice blockdevice.BlockDevice, sectorAll
3333
}
3434
}
3535

36+
type zeroReaderAt struct{}
37+
38+
func (zeroReaderAt) ReadAt(p []byte, off int64) (int, error) {
39+
for i := range p {
40+
p[i] = 0
41+
}
42+
return len(p), nil
43+
}
44+
3645
func (fp *blockDeviceBackedFilePool) NewFile() (filesystem.FileReadWriter, error) {
37-
return &blockDeviceBackedFile{
46+
readLayer := NewReadLayer(&zeroReaderAt{}, 0)
47+
return fp.NewFileWithReadLayer(readLayer, 0)
48+
}
49+
50+
func (fp *blockDeviceBackedFilePool) NewFileWithReadLayer(readLayer ReadLayer, sizeBytes uint64) (filesystem.FileReadWriter, error) {
51+
file := &blockDeviceBackedFile{
3852
fp: fp,
39-
}, nil
53+
rl: readLayer,
54+
}
55+
err := file.Truncate(int64(sizeBytes))
56+
if err != nil {
57+
return nil, err
58+
}
59+
return file, nil
4060
}
4161

4262
type blockDeviceBackedFile struct {
4363
fp *blockDeviceBackedFilePool
64+
rl ReadLayer
4465
sizeBytes uint64
4566
sectors []uint32
4667
}
@@ -172,23 +193,17 @@ func (f *blockDeviceBackedFile) GetNextRegionOffset(off int64, regionType filesy
172193
func (f *blockDeviceBackedFile) readFromSectors(p []byte, sectorIndex, lastSectorIndex, offsetWithinSector int) (int, error) {
173194
if sectorIndex >= len(f.sectors) {
174195
// Attempted to read from a hole located at the
175-
// end of the file. Fill up all of the remaining
176-
// space with zero bytes.
177-
for i := 0; i < len(p); i++ {
178-
p[i] = 0
179-
}
180-
return len(p), nil
196+
// end of the file. Delegate to ReadLayer.
197+
offset := f.fp.sectorSizeBytes*sectorIndex + offsetWithinSector
198+
return f.rl.ReadAt(p, int64(offset))
181199
}
182200

183201
sector, sectorsToRead := f.getSectorsContiguous(sectorIndex, lastSectorIndex)
184202
p = f.limitBufferToSectorBoundary(p, sectorsToRead, offsetWithinSector)
185203
if sector == 0 {
186204
// Attempted to read from a sparse region of the file.
187-
// Fill in zero bytes.
188-
for i := 0; i < len(p); i++ {
189-
p[i] = 0
190-
}
191-
return len(p), nil
205+
offset := f.fp.sectorSizeBytes*sectorIndex + offsetWithinSector
206+
return f.rl.ReadAt(p, int64(offset))
192207
}
193208

194209
// Attempted to read from a region of the file that contains
@@ -267,6 +282,9 @@ func (f *blockDeviceBackedFile) Truncate(size int64) error {
267282
if size < 0 {
268283
return status.Errorf(codes.InvalidArgument, "Negative truncation size: %d", size)
269284
}
285+
if err := f.rl.Truncate(size); err != nil {
286+
return fmt.Errorf("read layer truncate failed: %w", err)
287+
}
270288

271289
sectorIndex := int(size / int64(f.fp.sectorSizeBytes))
272290
offsetWithinSector := int(size % int64(f.fp.sectorSizeBytes))
@@ -299,7 +317,7 @@ func (f *blockDeviceBackedFile) Truncate(size int64) error {
299317
// writeToNewSectors is used to write data into new sectors. This
300318
// function is called when holes in a sparse file are filled up or when
301319
// data is appended to the end of a file.
302-
func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, offsetWithinSector int) (int, uint32, int, error) {
320+
func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, fromSector int, offsetWithinSector int) (int, uint32, int, error) {
303321
// Allocate space to store the data.
304322
sectorsToAllocate := int((uint64(offsetWithinSector) + uint64(len(p)) + uint64(f.fp.sectorSizeBytes) - 1) / uint64(f.fp.sectorSizeBytes))
305323
firstSector, sectorsAllocated, err := f.fp.sectorAllocator.AllocateContiguous(sectorsToAllocate)
@@ -314,10 +332,15 @@ func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, offsetWithinSector i
314332
nWritten := len(p)
315333

316334
// Write the first sector separately when we need to introduce
317-
// leading zero padding.
335+
// leading read layer padding.
318336
sector := firstSector
319337
if offsetWithinSector > 0 {
320338
buf := make([]byte, f.fp.sectorSizeBytes)
339+
logicalOffset := fromSector * f.fp.sectorSizeBytes
340+
if _, err := f.rl.ReadAt(buf[:offsetWithinSector], int64(logicalOffset)); err != nil {
341+
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
342+
return 0, 0, 0, err
343+
}
321344
nWritten := copy(buf[offsetWithinSector:], p)
322345
if _, err := f.fp.blockDevice.WriteAt(buf, f.toDeviceOffset(sector, 0)); err != nil {
323346
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
@@ -340,9 +363,14 @@ func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, offsetWithinSector i
340363
}
341364

342365
// Write the last sector separately when we need to introduce
343-
// trailing zero padding.
366+
// trailing read layer padding.
344367
if len(p) > 0 {
345368
buf := make([]byte, f.fp.sectorSizeBytes)
369+
logicalOffset := uint32(len(p)) + (sector-firstSector)*uint32(f.fp.sectorSizeBytes)
370+
if _, err := f.rl.ReadAt(buf[len(p):], int64(logicalOffset)); err != nil {
371+
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
372+
return 0, 0, 0, err
373+
}
346374
copy(buf, p)
347375
if _, err := f.fp.blockDevice.WriteAt(buf, f.toDeviceOffset(sector, 0)); err != nil {
348376
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
@@ -375,7 +403,7 @@ func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSector
375403
// Attempted to write past the end-of-file or within a
376404
// hole located at the end of a sparse file. Allocate
377405
// space and grow the file.
378-
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, offsetWithinSector)
406+
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, sectorIndex, offsetWithinSector)
379407
if err != nil {
380408
return 0, err
381409
}
@@ -389,7 +417,7 @@ func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSector
389417
if sector == 0 {
390418
// Attempted to write to a hole within a sparse file.
391419
// Allocate space and insert sectors into the file.
392-
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, offsetWithinSector)
420+
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, sectorIndex, offsetWithinSector)
393421
if err != nil {
394422
return 0, err
395423
}

pkg/filesystem/filepool/block_device_backed_file_pool_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package filepool_test
33
import (
44
"io"
55
"math"
6+
"strings"
67
"testing"
78

89
"github.com/buildbarn/bb-remote-execution/internal/mock"
@@ -319,4 +320,105 @@ func TestBlockDeviceBackedFilePool(t *testing.T) {
319320
_, err = f.WriteAt([]byte{0}, -1)
320321
testutil.RequireEqualStatus(t, status.Error(codes.InvalidArgument, "Negative write offset: -1"), err)
321322
})
323+
324+
t.Run("ReadFromHoleReturnsBackingData", func(t *testing.T) {
325+
rl := filepool.NewReadLayer(strings.NewReader("HelloWorld"), 10)
326+
file, err := pool.NewFileWithReadLayer(rl, 10)
327+
require.NoError(t, err)
328+
329+
buf := make([]byte, 10)
330+
n, err := file.ReadAt(buf, 0)
331+
require.Equal(t, err, io.EOF)
332+
require.Equal(t, 10, n)
333+
require.Equal(t, []byte("HelloWorld"), buf)
334+
335+
require.NoError(t, file.Close())
336+
})
337+
338+
t.Run("ReadFromHoleBeyondBackingReturnsZeroes", func(t *testing.T) {
339+
rl := filepool.NewReadLayer(strings.NewReader("abc"), 3)
340+
file, err := pool.NewFileWithReadLayer(rl, 6)
341+
require.NoError(t, err)
342+
343+
buf := make([]byte, 6)
344+
n, err := file.ReadAt(buf, 0)
345+
require.Equal(t, err, io.EOF)
346+
require.Equal(t, 6, n)
347+
require.Equal(t, []byte("abc\x00\x00\x00"), buf)
348+
349+
require.NoError(t, file.Close())
350+
})
351+
352+
t.Run("TruncatePropagatesToBackingLayer", func(t *testing.T) {
353+
rl := filepool.NewReadLayer(strings.NewReader("abcdefghij"), 10)
354+
file, err := pool.NewFileWithReadLayer(rl, 10)
355+
require.NoError(t, err)
356+
357+
require.NoError(t, file.Truncate(4))
358+
359+
buf := make([]byte, 6)
360+
n, err := file.ReadAt(buf, 0)
361+
require.Equal(t, 4, n)
362+
require.Equal(t, err, io.EOF)
363+
require.Equal(t, []byte("abcd\x00\x00"), buf)
364+
365+
require.NoError(t, file.Close())
366+
})
367+
368+
t.Run("TruncateHasNoGhosting", func(t *testing.T) {
369+
rl := filepool.NewReadLayer(strings.NewReader("abcdefghij"), 10)
370+
file, err := pool.NewFileWithReadLayer(rl, 10)
371+
require.NoError(t, err)
372+
// shrunk to 4 bytes
373+
require.NoError(t, file.Truncate(4))
374+
buf := make([]byte, 6)
375+
n, err := file.ReadAt(buf, 0)
376+
require.Equal(t, 4, n)
377+
require.Equal(t, err, io.EOF)
378+
require.Equal(t, []byte("abcd\x00\x00"), buf)
379+
// grow to 10 bytes
380+
require.NoError(t, file.Truncate(10))
381+
n, err = file.ReadAt(buf, 0)
382+
require.Equal(t, 6, n)
383+
require.NoError(t, err)
384+
require.Equal(t, []byte("abcd\x00\x00"), buf)
385+
386+
require.NoError(t, file.Close())
387+
})
388+
389+
t.Run("WriteOverridesBackingLayer", func(t *testing.T) {
390+
// Use 4 byte sectors for clarity.
391+
pool := filepool.NewBlockDeviceBackedFilePool(blockDevice, sectorAllocator, 4)
392+
// Each letter covers a full sector.
393+
rl := filepool.NewReadLayer(strings.NewReader("AAAABBBBCCCC"), 12)
394+
file, err := pool.NewFileWithReadLayer(rl, 12)
395+
require.NoError(t, err)
396+
397+
// Write ZZZ at offset 2, this spans the first two sectors which
398+
// requires us to bring them into our sparse file. We will allocate
399+
// sector 10 and 11 of our block device for this (address 36 and 40).
400+
sectorAllocator.EXPECT().AllocateContiguous(2).Return(uint32(10), 2, nil)
401+
blockDevice.EXPECT().WriteAt([]byte("AAZZ"), int64(36)).Return(4, nil)
402+
blockDevice.EXPECT().WriteAt([]byte("ZBBB"), int64(40)).Return(4, nil)
403+
n, err := file.WriteAt([]byte("ZZZ"), 2)
404+
require.NoError(t, err)
405+
require.Equal(t, 3, n)
406+
407+
// This data would then be expected to be read back to us.
408+
blockDevice.EXPECT().ReadAt(gomock.Len(8), int64(36)).DoAndReturn(
409+
func(p []byte, offset int64) (int, error) {
410+
copy(p, []byte("AAZZZBBB"))
411+
return 8, nil
412+
},
413+
)
414+
415+
buf := make([]byte, 12)
416+
n, err = file.ReadAt(buf, 0)
417+
require.Equal(t, err, io.EOF)
418+
require.Equal(t, n, 12)
419+
require.Equal(t, []byte("AAZZZBBBCCCC"), buf)
420+
421+
sectorAllocator.EXPECT().FreeList([]uint32{10, 11})
422+
require.NoError(t, file.Close())
423+
})
322424
}

pkg/filesystem/filepool/empty_file_pool.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,10 @@ func (fp emptyFilePool) NewFile() (filesystem.FileReadWriter, error) {
1313
return nil, status.Error(codes.ResourceExhausted, "Cannot create file in empty file pool")
1414
}
1515

16+
func (fp emptyFilePool) NewFileWithReadLayer(ReadLayer, uint64) (filesystem.FileReadWriter, error) {
17+
return nil, status.Error(codes.ResourceExhausted, "Cannot create file in empty file pool")
18+
}
19+
1620
// EmptyFilePool is a FilePool that does not permit the creation of new
1721
// files. It is used as the default FilePool for the root of the
1822
// worker's FUSE file system to disallow the creation of files not bound

pkg/filesystem/filepool/file_pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,12 @@ import (
1010
// File handles returned by NewFile() are not thread-safe. Additional
1111
// locking needs to be done at higher levels to permit safe concurrent
1212
// access.
13+
//
14+
// Files returned by NewFileWithReadLayer allows the creation of
15+
// temporary files which are backed by a different data source. This
16+
// datasource is presumed to be immutable and allows efficient creation
17+
// of writeable files with content.
1318
type FilePool interface {
1419
NewFile() (filesystem.FileReadWriter, error)
20+
NewFileWithReadLayer(ReadLayer, uint64) (filesystem.FileReadWriter, error)
1521
}

pkg/filesystem/filepool/metrics_file_pool.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ func (fp *metricsFilePool) NewFile() (filesystem.FileReadWriter, error) {
5454
}, nil
5555
}
5656

57+
func (fp *metricsFilePool) NewFileWithReadLayer(readLayer ReadLayer, sizeBytes uint64) (filesystem.FileReadWriter, error) {
58+
return fp.base.NewFileWithReadLayer(readLayer, sizeBytes)
59+
}
60+
5761
type metricsFile struct {
5862
filesystem.FileReadWriter
5963
}

pkg/filesystem/filepool/quota_enforcing_file_pool.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ func (fp *quotaEnforcingFilePool) NewFile() (filesystem.FileReadWriter, error) {
6868
}, nil
6969
}
7070

71+
func (fp *quotaEnforcingFilePool) NewFileWithReadLayer(readLayer ReadLayer, sizeBytes uint64) (filesystem.FileReadWriter, error) {
72+
return nil, status.Error(codes.Internal, "Not implemented")
73+
}
74+
7175
type quotaEnforcingFile struct {
7276
filesystem.FileReadWriter
7377

0 commit comments

Comments
 (0)