Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 38 additions & 8 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1277,6 +1277,17 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {
info.offset = WALHeaderSize
info.salt1 = binary.BigEndian.Uint32(hdr[16:])
info.salt2 = binary.BigEndian.Uint32(hdr[20:])

// Safe to skip snapshot: forceNextSnapshot is only set by checkpoint(),
// which calls verifyAndSync() immediately before, guaranteeing that
// syncedToWALEnd reflects the WAL state right before the checkpoint.
if db.syncedToWALEnd {
db.syncedToWALEnd = false
info.snapshotting = false
info.reason = "WAL restarted but all frames already synced"
return info, nil
}

info.reason = "WAL restarted during checkpoint"
return info, nil
}
Expand Down Expand Up @@ -1626,14 +1637,11 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) (sync
finalOffset := info.offset + sz
db.lastSyncedWALOffset = finalOffset

// Track if we synced to the exact end of the WAL file.
// This is used by verify() to distinguish expected checkpoint truncation
// from unexpected external WAL modifications. See issue #927.
if walSize, err := db.walFileSize(); err == nil {
db.syncedToWALEnd = finalOffset == walSize
} else {
db.syncedToWALEnd = false
}
// The WALReader reads all valid frames, stopping at salt mismatch or EOF,
// so finalOffset represents the logical end of valid WAL content.
// Using physical file size was incorrect after a WAL restart because
// the file retains stale data from the previous generation. See issue #1165.
db.syncedToWALEnd = true

db.Logger.Debug("db sync", "status", "ok")

Expand Down Expand Up @@ -1743,6 +1751,28 @@ func (db *DB) checkpoint(ctx context.Context, mode string) error {
return fmt.Errorf("cannot copy wal before checkpoint: %w", err)
}

// Detect if a concurrent writer appended frames after our sync completed.
// Read the frame header at lastSyncedWALOffset and check if its salts match
// the current WAL header. Matching salts indicate a valid frame from a
// concurrent writer; mismatched salts indicate stale data from a previous
// WAL generation. Using physical file size was incorrect because stale data
// after a WAL restart inflates the file size. See issue #1165.
if db.syncedToWALEnd {
if f, err := os.Open(db.WALPath()); err == nil {
frmHdr := make([]byte, WALFrameHeaderSize)
if n, _ := f.ReadAt(frmHdr, db.lastSyncedWALOffset); n == WALFrameHeaderSize {
frameSalt1 := binary.BigEndian.Uint32(frmHdr[8:])
frameSalt2 := binary.BigEndian.Uint32(frmHdr[12:])
walSalt1 := binary.BigEndian.Uint32(hdr[16:])
walSalt2 := binary.BigEndian.Uint32(hdr[20:])
if frameSalt1 == walSalt1 && frameSalt2 == walSalt2 {
db.syncedToWALEnd = false
}
}
f.Close()
}
}

// Execute checkpoint and immediately issue a write to the WAL to ensure
// a new page is written.
if err := db.execCheckpoint(ctx, mode); err != nil {
Expand Down
225 changes: 224 additions & 1 deletion db_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,8 +1655,10 @@ func TestDB_Verify_ForceSnapshotAfterCheckpointWALRestart(t *testing.T) {
expectedSalt1 := binary.BigEndian.Uint32(walHdr[16:])
expectedSalt2 := binary.BigEndian.Uint32(walHdr[20:])

// Set forceNextSnapshot (simulates what checkpoint() does on WAL restart).
// Set forceNextSnapshot (simulates what checkpoint() does on WAL restart)
// with syncedToWALEnd=false (simulates unsynced WAL frames before restart).
db.forceNextSnapshot = true
db.syncedToWALEnd = false

info, err = db.verify(ctx)
if err != nil {
Expand Down Expand Up @@ -2066,3 +2068,224 @@ func TestDB_Sync_InitErrorMetrics(t *testing.T) {
t.Fatalf("litestream_sync_error_count=%v, want > %v (init error should be counted)", syncErrorValue, baselineErrors)
}
}

func TestDB_Verify_ForceSnapshotSkippedWhenSyncedToWALEnd(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "db")

db := NewDB(dbPath)
db.MonitorInterval = 0
db.Replica = NewReplica(db)
db.Replica.Client = &testReplicaClient{dir: t.TempDir()}
db.Replica.MonitorEnabled = false
if err := db.Open(); err != nil {
t.Fatal(err)
}
defer func() {
if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
}
}()

sqldb, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatal(err)
}
defer sqldb.Close()

if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data TEXT)`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, 'initial')`); err != nil {
t.Fatal(err)
}

ctx := context.Background()

if err := db.Sync(ctx); err != nil {
t.Fatal(err)
}

walHdr, err := readWALHeader(db.WALPath())
if err != nil {
t.Fatalf("read WAL header: %v", err)
}
expectedSalt1 := binary.BigEndian.Uint32(walHdr[16:])
expectedSalt2 := binary.BigEndian.Uint32(walHdr[20:])

db.forceNextSnapshot = true
db.syncedToWALEnd = true

info, err := db.verify(ctx)
if err != nil {
t.Fatalf("verify: %v", err)
}
if info.snapshotting {
t.Fatal("expected snapshotting=false when syncedToWALEnd=true")
}
if info.offset != WALHeaderSize {
t.Fatalf("offset=%d, want %d", info.offset, WALHeaderSize)
}
if info.salt1 != expectedSalt1 || info.salt2 != expectedSalt2 {
t.Fatalf("salt mismatch: got (%d,%d), want (%d,%d)", info.salt1, info.salt2, expectedSalt1, expectedSalt2)
}
if db.forceNextSnapshot {
t.Fatal("forceNextSnapshot should be cleared")
}
if db.syncedToWALEnd {
t.Fatal("syncedToWALEnd should be cleared")
}
}

func TestDB_CheckpointDoesNotCreateSnapshotWhenFullySynced(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "db")

db := NewDB(dbPath)
db.MonitorInterval = 0
db.Replica = NewReplica(db)
db.Replica.Client = &testReplicaClient{dir: t.TempDir()}
db.Replica.MonitorEnabled = false
if err := db.Open(); err != nil {
t.Fatal(err)
}
defer func() {
if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
}
}()

sqldb, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatal(err)
}
defer sqldb.Close()

if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data BLOB)`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, zeroblob(4096))`); err != nil {
t.Fatal(err)
}

ctx := context.Background()

if err := db.Sync(ctx); err != nil {
t.Fatal(err)
}

pos, err := db.Pos()
if err != nil {
t.Fatal(err)
}
preTXID := pos.TXID

if _, err := sqldb.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
t.Fatal(err)
}

if _, err := sqldb.Exec(`INSERT INTO t VALUES (2, 'small')`); err != nil {
t.Fatal(err)
}

if err := db.Sync(ctx); err != nil {
t.Fatal(err)
}

pos, err = db.Pos()
if err != nil {
t.Fatal(err)
}

ltxPath := db.LTXPath(0, pos.TXID, pos.TXID)
fi, err := os.Stat(ltxPath)
if err != nil {
t.Fatalf("stat ltx: %v", err)
}

preLTXPath := db.LTXPath(0, preTXID, preTXID)
preFI, err := os.Stat(preLTXPath)
if err != nil {
t.Fatalf("stat pre-checkpoint ltx: %v", err)
}

if fi.Size() >= preFI.Size() {
t.Fatalf("post-checkpoint LTX (%d bytes) should be smaller than pre-checkpoint LTX (%d bytes); snapshot was not avoided",
fi.Size(), preFI.Size())
}

_ = preTXID
}

func TestDB_Checkpoint_ConcurrentWriterClearsSyncedToWALEnd(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "db")

db := NewDB(dbPath)
db.MonitorInterval = 0
db.Replica = NewReplica(db)
db.Replica.Client = &testReplicaClient{dir: t.TempDir()}
db.Replica.MonitorEnabled = false
if err := db.Open(); err != nil {
t.Fatal(err)
}
defer func() {
if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
}
}()

sqldb, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatal(err)
}
defer sqldb.Close()

if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data TEXT)`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, 'initial')`); err != nil {
t.Fatal(err)
}

ctx := context.Background()

if err := db.Sync(ctx); err != nil {
t.Fatal(err)
}

if !db.syncedToWALEnd {
t.Fatal("expected syncedToWALEnd=true after sync")
}

if _, err := sqldb.Exec(`INSERT INTO t VALUES (2, 'concurrent write')`); err != nil {
t.Fatal(err)
}

walSize, err := db.walFileSize()
if err != nil {
t.Fatal(err)
}
if walSize <= db.lastSyncedWALOffset {
t.Fatal("expected WAL to grow after concurrent write")
}

if db.syncedToWALEnd {
if walSize > db.lastSyncedWALOffset {
db.syncedToWALEnd = false
}
}

if db.syncedToWALEnd {
t.Fatal("syncedToWALEnd should be cleared when WAL grew after sync")
}
}
Loading