Skip to content

Commit bfdd9ec

Browse files
authored
table: add RotateBlock options and WithRotateBlockWaitGroup (#901)
1 parent ced8217 commit bfdd9ec

File tree

4 files changed

+58
-18
lines changed

4 files changed

+58
-18
lines changed

db.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -844,7 +844,7 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
844844
// tx == nextNonPersistedTxn, we should not persist the active
845845
// block, but just create a new block.
846846
table.pendingBlocks[table.active] = struct{}{}
847-
go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false)
847+
go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation)
848848
}
849849

850850
protoEqual := false
@@ -1006,7 +1006,7 @@ func (db *DB) Close(options ...CloseOption) error {
10061006
// should be faster to write to local disk than upload to object
10071007
// storage. This would avoid a slow WAL replay on startup if we
10081008
// don't manage to persist in time.
1009-
table.writeBlock(table.ActiveBlock(), db.tx.Load(), false, false)
1009+
table.writeBlock(table.ActiveBlock(), db.tx.Load(), false)
10101010
}
10111011
}
10121012
level.Info(db.logger).Log("msg", "closed all tables")

db_test.go

+8-8
Original file line numberDiff line numberDiff line change
@@ -1206,7 +1206,7 @@ func TestDBRecover(t *testing.T) {
12061206
// the second write, and the second was triggered before the third write.
12071207
if blockRotation {
12081208
// A block rotation should trigger the third snapshot.
1209-
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))
1209+
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock()))
12101210
// Wait for the snapshot to complete
12111211
require.Eventually(t, func() bool {
12121212
files, err := os.ReadDir(db.snapshotsDir())
@@ -1404,7 +1404,7 @@ func TestDBRecover(t *testing.T) {
14041404
table, err := db.GetTable(dbAndTableName)
14051405
require.NoError(t, err)
14061406

1407-
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))
1407+
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock()))
14081408

14091409
rec, err := dynparquet.NewTestSamples().ToRecord()
14101410
require.NoError(t, err)
@@ -1414,7 +1414,7 @@ func TestDBRecover(t *testing.T) {
14141414

14151415
// RotateBlock again, this should log a couple of persisted block WAL
14161416
// entries.
1417-
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))
1417+
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock()))
14181418
require.NoError(t, c.Close())
14191419

14201420
c, err = New(
@@ -1474,7 +1474,7 @@ func TestDBRecover(t *testing.T) {
14741474
require.NoError(t, err)
14751475

14761476
// Rotate the block to create a new active block.
1477-
require.NoError(t, table.RotateBlock(ctx, block, false))
1477+
require.NoError(t, table.RotateBlock(ctx, block))
14781478

14791479
// Issue writes.
14801480
const nWrites = 5
@@ -1635,7 +1635,7 @@ func TestDBMinTXPersisted(t *testing.T) {
16351635
writeTx, err := table.InsertRecord(ctx, r)
16361636
require.NoError(t, err)
16371637

1638-
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))
1638+
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock()))
16391639
// Writing the block is asynchronous, so wait for both the new table block
16401640
// txn and the block persistence txn.
16411641
db.Wait(writeTx + 2)
@@ -2657,7 +2657,7 @@ func Test_DB_PersistentDiskCompaction_BlockRotation(t *testing.T) {
26572657
validateRows(1200)
26582658

26592659
// Rotate block
2660-
require.NoError(t, table.RotateBlock(context.Background(), table.ActiveBlock(), false))
2660+
require.NoError(t, table.RotateBlock(context.Background(), table.ActiveBlock()))
26612661

26622662
validateRows(1200)
26632663

@@ -3151,7 +3151,7 @@ func Test_Iceberg(t *testing.T) {
31513151

31523152
validateRows(10)
31533153

3154-
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))
3154+
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock()))
31553155
require.Eventually(t, func() bool {
31563156
info, err := bucket.Attributes(ctx, filepath.Join("test", "test", "metadata", "v1.metadata.json"))
31573157
if err != nil {
@@ -3171,7 +3171,7 @@ func Test_Iceberg(t *testing.T) {
31713171

31723172
validateRows(13)
31733173

3174-
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock(), false))
3174+
require.NoError(t, table.RotateBlock(ctx, table.ActiveBlock()))
31753175
require.Eventually(t, func() bool {
31763176
info, err := bucket.Attributes(ctx, filepath.Join("test", "test", "metadata", "v2.metadata.json"))
31773177
if err != nil {

dst/dst_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ func TestDST(t *testing.T) {
400400
case rotate:
401401
errg.Go(func() error {
402402
table := tp.GetTable()
403-
if err := table.RotateBlock(ctx, table.ActiveBlock(), false); err != nil {
403+
if err := table.RotateBlock(ctx, table.ActiveBlock()); err != nil {
404404
return fmt.Errorf("rotate error: %s", err)
405405
}
406406
return nil

table.go

+47-7
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,16 @@ func (t *Table) dropPendingBlock(block *TableBlock) {
496496
}
497497
}
498498

499-
func (t *Table) writeBlock(block *TableBlock, nextTxn uint64, skipPersist, snapshotDB bool) {
499+
func (t *Table) writeBlock(
500+
block *TableBlock, nextTxn uint64, snapshotDB bool, opts ...RotateBlockOption,
501+
) {
502+
rbo := &rotateBlockOptions{}
503+
for _, o := range opts {
504+
o(rbo)
505+
}
506+
if rbo.wg != nil {
507+
defer rbo.wg.Done()
508+
}
500509
level.Debug(t.logger).Log("msg", "syncing block", "ulid", block.ulid, "size", block.index.Size())
501510
block.pendingWritersWg.Wait()
502511

@@ -506,7 +515,7 @@ func (t *Table) writeBlock(block *TableBlock, nextTxn uint64, skipPersist, snaps
506515

507516
// Persist the block
508517
var err error
509-
if !skipPersist && block.index.Size() != 0 {
518+
if !rbo.skipPersist && block.index.Size() != 0 {
510519
err = block.Persist()
511520
}
512521
t.dropPendingBlock(block)
@@ -605,20 +614,51 @@ func (t *Table) writeBlock(block *TableBlock, nextTxn uint64, skipPersist, snaps
605614
}
606615
}
607616

608-
func (t *Table) RotateBlock(_ context.Context, block *TableBlock, skipPersist bool) error {
617+
type rotateBlockOptions struct {
618+
skipPersist bool
619+
wg *sync.WaitGroup
620+
}
621+
622+
type RotateBlockOption func(*rotateBlockOptions)
623+
624+
// WithRotateBlockSkipPersist instructs the block rotation operation to not
625+
// persist the block to object storage.
626+
func WithRotateBlockSkipPersist() RotateBlockOption {
627+
return func(o *rotateBlockOptions) {
628+
o.skipPersist = true
629+
}
630+
}
631+
632+
// WithRotateBlockWaitGroup provides a WaitGroup. The rotate block operation
633+
// will call wg.Done once the block has been persisted. Otherwise, RotateBlock
634+
// asynchronously persists the block.
635+
func WithRotateBlockWaitGroup(wg *sync.WaitGroup) RotateBlockOption {
636+
return func(o *rotateBlockOptions) {
637+
o.wg = wg
638+
}
639+
}
640+
641+
func (t *Table) RotateBlock(_ context.Context, block *TableBlock, opts ...RotateBlockOption) error {
642+
rbo := &rotateBlockOptions{}
643+
for _, o := range opts {
644+
o(rbo)
645+
}
609646
t.mtx.Lock()
610647
defer t.mtx.Unlock()
611648

612649
// Need to check that we haven't already rotated this block.
613650
if t.active != block {
651+
if rbo.wg != nil {
652+
rbo.wg.Done()
653+
}
614654
return nil
615655
}
616656

617657
level.Debug(t.logger).Log(
618658
"msg", "rotating block",
619659
"ulid", block.ulid,
620660
"size", block.Size(),
621-
"skip_persist", skipPersist,
661+
"skip_persist", rbo.skipPersist,
622662
)
623663
defer level.Debug(t.logger).Log("msg", "done rotating block", "ulid", block.ulid)
624664

@@ -637,7 +677,7 @@ func (t *Table) RotateBlock(_ context.Context, block *TableBlock, skipPersist bo
637677
t.metrics.blockRotated.Inc()
638678
t.metrics.numParts.Set(float64(0))
639679

640-
if !skipPersist {
680+
if !rbo.skipPersist {
641681
// If skipping persist, this block rotation is simply a block discard,
642682
// so no need to add this block to pending blocks. Some callers rely
643683
// on the fact that blocks are not available for reads as soon as
@@ -647,7 +687,7 @@ func (t *Table) RotateBlock(_ context.Context, block *TableBlock, skipPersist bo
647687
// We don't check t.db.columnStore.manualBlockRotation here because this is
648688
// the entry point for users to trigger a manual block rotation and they
649689
// will specify through skipPersist if they want the block to be persisted.
650-
go t.writeBlock(block, tx, skipPersist, true)
690+
go t.writeBlock(block, tx, true, opts...)
651691

652692
return nil
653693
}
@@ -750,7 +790,7 @@ func (t *Table) appender(ctx context.Context) (*TableBlock, func(), error) {
750790
// We need to rotate the block and the writer won't actually be used.
751791
finish()
752792

753-
err = t.RotateBlock(ctx, block, false)
793+
err = t.RotateBlock(ctx, block)
754794
if err != nil {
755795
return nil, nil, fmt.Errorf("rotate block: %w", err)
756796
}

0 commit comments

Comments
 (0)