Skip to content

Commit ef7d1fe

Browse files
authored
dst: bug and test fixes (#874)
* dst: wait for spawned goroutines to finish on restart command Otherwise, some goroutines from the previous store (like block persistence) could race with the new store. This is an artifact of restarting the DB in-process. The approach used is to simply scan the running goroutine stacks, ignoring any goroutines that existed at the start of the test. * wal: sleep for defaultTickTime if no progress is made Previously, runtime.Gosched was called. However, this could be an issue in deterministic simulation tests since if other goroutines are sleeping, the yielding goroutine will schedule itself before advancing faketime. Sleeping is a quick fix for avoiding getting into this infinite loop, although we should probably come up with a long-term fix for these types of cases. * lsm: fix nil pointer in merge * dst: close most recent ColumnStore instead of first on defer We were closing the incorrect column store at the end of the test (in some cases double closing). * db: correctly discard table block contained in snapshot if block was persisted Previously, the code attempted to do this by resetting the active block index. However, the ULID was left untouched. This would cause a previously persisted block to lose data when the new empty block's rotation happened. This commit updates the full block when a NewTableBlock entry is found and the table exists. * table: enhance rotation log messages These log messges were useful when debugging DST failures.
1 parent c459a51 commit ef7d1fe

File tree

5 files changed

+50
-36
lines changed

5 files changed

+50
-36
lines changed

db.go

+13-25
Original file line numberDiff line numberDiff line change
@@ -746,26 +746,8 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
746746
switch e := record.Entry.EntryType.(type) {
747747
case *walpb.Entry_TableBlockPersisted_:
748748
persistedTables[e.TableBlockPersisted.TableName] = e.TableBlockPersisted.NextTx
749-
if e.TableBlockPersisted.NextTx > snapshotTx {
750-
// The loaded snapshot has data in a table that has been
751-
// persisted. Delete all data in this table, since it has
752-
// already been persisted.
753-
db.mtx.Lock()
754-
if table, ok := db.tables[e.TableBlockPersisted.TableName]; ok {
755-
table.ActiveBlock().index, err = index.NewLSM(
756-
filepath.Join(table.db.indexDir(), table.name, table.ActiveBlock().ulid.String()), // Any index files are found at <db.indexDir>/<table.name>/<block.id>
757-
table.schema,
758-
table.IndexConfig(),
759-
db.HighWatermark,
760-
index.LSMWithMetrics(table.metrics.indexMetrics),
761-
index.LSMWithLogger(table.logger),
762-
)
763-
if err != nil {
764-
return fmt.Errorf("create new lsm index: %w", err)
765-
}
766-
}
767-
db.mtx.Unlock()
768-
}
749+
// The loaded snapshot might have persisted data, this is handled in
750+
// the replay loop below.
769751
return nil
770752
default:
771753
return nil
@@ -802,7 +784,8 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
802784
return err
803785
}
804786

805-
if nextNonPersistedTxn, ok := persistedTables[entry.TableName]; ok && tx <= nextNonPersistedTxn {
787+
nextNonPersistedTxn, wasPersisted := persistedTables[entry.TableName]
788+
if wasPersisted && tx < nextNonPersistedTxn {
806789
// This block has already been successfully persisted, so we can
807790
// skip it. Note that if this new table block is the active
808791
// block after persistence tx == nextNonPersistedTxn.
@@ -849,15 +832,20 @@ func (db *DB) recover(ctx context.Context, wal WAL) error {
849832
return fmt.Errorf("get table: %w", err)
850833
}
851834

852-
// If we get to this point it means a block was finished but did
853-
// not get persisted.
854835
level.Info(db.logger).Log(
855836
"msg", "writing unfinished block in recovery",
856837
"table", tableName,
857838
"tx", tx,
858839
)
859-
table.pendingBlocks[table.active] = struct{}{}
860-
go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false)
840+
if snapshotTx == 0 || tx != nextNonPersistedTxn {
841+
// If we get to this point it means a block was finished but did
842+
// not get persisted. If a snapshot was loaded, then the table
843+
// already exists but the active block is outdated. If
844+
// tx == nextNonPersistedTxn, we should not persist the active
845+
// block, but just create a new block.
846+
table.pendingBlocks[table.active] = struct{}{}
847+
go table.writeBlock(table.active, tx, db.columnStore.manualBlockRotation, false)
848+
}
861849

862850
protoEqual := false
863851
switch schema.(type) {

dst/dst_test.go

+24-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/polarsignals/wal/types"
2828
"github.com/stretchr/testify/require"
2929
"github.com/thanos-io/objstore"
30+
"go.uber.org/goleak"
3031
"golang.org/x/sync/errgroup"
3132

3233
"github.com/polarsignals/frostdb"
@@ -337,7 +338,6 @@ func TestDST(t *testing.T) {
337338
}, walTicker,
338339
)
339340
require.NoError(t, err)
340-
defer c.Close()
341341

342342
ctx := context.Background()
343343
var db atomic.Pointer[frostdb.DB]
@@ -371,6 +371,8 @@ func TestDST(t *testing.T) {
371371
errg := &errgroup.Group{}
372372
errg.SetLimit(32)
373373
commandDistribution := make(map[command]int)
374+
375+
ignoreGoroutinesAtStartOfTest := goleak.IgnoreCurrent()
374376
for i := 0; i < numCommands; i++ {
375377
cmd := genCommand()
376378
commandDistribution[cmd]++
@@ -417,6 +419,23 @@ func TestDST(t *testing.T) {
417419
time.Sleep(1 * time.Millisecond)
418420
// Graceful shutdown.
419421
require.NoError(t, c.Close())
422+
_ = errg.Wait()
423+
424+
// Unfortunately frostdb doesn't have goroutine lifecycle management
425+
// and adding it could lead to subtle issues (e.g. on Close with
426+
// many DBs). Instead, this test simply verifies all goroutines
427+
// spawned up until this restart eventually exit after n retries.
428+
const maxRetries = 10
429+
for i := 0; i < maxRetries; i++ {
430+
if err := goleak.Find(ignoreGoroutinesAtStartOfTest); err == nil {
431+
break
432+
} else if i == maxRetries-1 {
433+
t.Fatalf("leaked goroutines found on Close: %v", err)
434+
} else {
435+
time.Sleep(1 * time.Millisecond)
436+
}
437+
}
438+
420439
storeID++
421440
c, err = newStore(
422441
storageDir,
@@ -462,6 +481,10 @@ func TestDST(t *testing.T) {
462481
t.Log("snapshot files:", listFiles("snapshots"))
463482
t.Log("WAL files:", listFiles("wal"))
464483

484+
// Defer a close here. This is not done at the start of the test because
485+
// the test run itself may close the store.
486+
defer c.Close()
487+
465488
timestampSum := &int64checksum{}
466489
readTimestamps := make(map[int64]int)
467490
expectedTimestamps := make(map[int64]struct{})

index/lsm.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -551,7 +551,7 @@ func (l *LSM) merge(level SentinelType) error {
551551
// Find the first part that is <= the watermark and reset the compact list to that part.
552552
wm := l.watermark()
553553
compact.Iterate(func(node *Node) bool {
554-
if node.part != nil && node.sentinel != L0 {
554+
if node.part == nil && node.sentinel != L0 {
555555
return false
556556
}
557557
if node.part.TX() <= wm {

table.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -497,12 +497,12 @@ func (t *Table) dropPendingBlock(block *TableBlock) {
497497
}
498498

499499
func (t *Table) writeBlock(block *TableBlock, nextTxn uint64, skipPersist, snapshotDB bool) {
500-
level.Debug(t.logger).Log("msg", "syncing block")
500+
level.Debug(t.logger).Log("msg", "syncing block", "ulid", block.ulid, "size", block.index.Size())
501501
block.pendingWritersWg.Wait()
502502

503503
// from now on, the block will no longer be modified, we can persist it to disk
504504

505-
level.Debug(t.logger).Log("msg", "done syncing block")
505+
level.Debug(t.logger).Log("msg", "done syncing block", "ulid", block.ulid, "size", block.index.Size())
506506

507507
// Persist the block
508508
var err error
@@ -614,10 +614,13 @@ func (t *Table) RotateBlock(_ context.Context, block *TableBlock, skipPersist bo
614614
return nil
615615
}
616616

617-
level.Debug(t.logger).Log("msg", "rotating block", "blockSize", block.Size(), "skipPersist", skipPersist)
618-
defer func() {
619-
level.Debug(t.logger).Log("msg", "done rotating block")
620-
}()
617+
level.Debug(t.logger).Log(
618+
"msg", "rotating block",
619+
"ulid", block.ulid,
620+
"size", block.Size(),
621+
"skip_persist", skipPersist,
622+
)
623+
defer level.Debug(t.logger).Log("msg", "done rotating block", "ulid", block.ulid)
621624

622625
tx, _, commit := t.db.begin()
623626
defer commit()

wal/wal.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88
"math"
99
"os"
10-
"runtime"
1110
"sync"
1211
"time"
1312

@@ -261,8 +260,9 @@ func Open(
261260
}
262261

263262
func (w *FileWAL) run(ctx context.Context) {
263+
const defaultTickTime = 50 * time.Millisecond
264264
if w.ticker == nil {
265-
w.ticker = realTicker{Ticker: time.NewTicker(50 * time.Millisecond)}
265+
w.ticker = realTicker{Ticker: time.NewTicker(defaultTickTime)}
266266
}
267267
defer w.ticker.Stop()
268268
// lastQueueSize is only used on shutdown to reduce debug logging verbosity.
@@ -291,7 +291,7 @@ func (w *FileWAL) run(ctx context.Context) {
291291

292292
if n == lastQueueSize {
293293
// No progress made.
294-
runtime.Gosched()
294+
time.Sleep(defaultTickTime)
295295
continue
296296
}
297297

0 commit comments

Comments
 (0)