Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 52 additions & 25 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,6 @@ type DB struct {
// otherwise create unnecessary LTX files. See issue #896.
syncedSinceCheckpoint bool

// syncedToWALEnd tracks whether the last successful sync reached the
// exact end of the WAL file. When true, a subsequent WAL truncation
// (from checkpoint) is expected and should NOT trigger a full snapshot.
// This prevents issue #927 where every checkpoint triggers unnecessary
// full snapshots because verify() sees the old LTX position exceeds
// the new (truncated) WAL size.
syncedToWALEnd bool

// lastSyncedWALOffset tracks the logical end of the WAL content after
// the last successful sync. This is the WALOffset + WALSize from the
// last LTX file. Used for checkpoint threshold decisions instead of
Expand Down Expand Up @@ -1270,14 +1262,61 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {

if db.forceNextSnapshot {
db.forceNextSnapshot = false
hdr, err := readWALHeader(db.WALPath())
newHdr, err := readWALHeader(db.WALPath())
if err != nil {
return info, fmt.Errorf("read wal header for forced snapshot: %w", err)
}
info.offset = WALHeaderSize
info.salt1 = binary.BigEndian.Uint32(hdr[16:])
info.salt2 = binary.BigEndian.Uint32(hdr[20:])
info.reason = "WAL restarted during checkpoint"
info.salt1 = binary.BigEndian.Uint32(newHdr[16:])
info.salt2 = binary.BigEndian.Uint32(newHdr[20:])

// Determine if all WAL frames were synced before the restart by reading
// the last LTX file's position and checking for unsynced frames. This
// approach is persistent and survives Litestream restarts. See issue #1165.
ltxPath := db.LTXPath(0, pos.TXID, pos.TXID)
ltxFile, err := os.Open(ltxPath)
if err != nil {
info.reason = "WAL restarted during checkpoint"
return info, nil
}

dec := ltx.NewDecoder(ltxFile)
if err := dec.DecodeHeader(); err != nil {
_ = ltxFile.Close()
info.reason = "WAL restarted during checkpoint"
return info, nil
}
_ = ltxFile.Close()

// Check if there are unsynced frames from the old WAL generation by
// reading at the position after the last synced frame.
ltxEndOffset := dec.Header().WALOffset + dec.Header().WALSize
oldSalt1 := dec.Header().WALSalt1
oldSalt2 := dec.Header().WALSalt2

walFile, err := os.Open(db.WALPath())
if err != nil {
info.reason = "WAL restarted during checkpoint"
return info, nil
}

frmHdr := make([]byte, WALFrameHeaderSize)
n, _ := walFile.ReadAt(frmHdr, ltxEndOffset)
_ = walFile.Close()

if n == WALFrameHeaderSize {
frameSalt1 := binary.BigEndian.Uint32(frmHdr[8:])
frameSalt2 := binary.BigEndian.Uint32(frmHdr[12:])
// If the frame has old salts, there are unsynced frames
if frameSalt1 == oldSalt1 && frameSalt2 == oldSalt2 {
info.reason = "WAL restarted with unsynced frames"
return info, nil
}
}

// No unsynced frames - skip snapshot
info.snapshotting = false
info.reason = "WAL restarted but all frames already synced"
return info, nil
}

Expand Down Expand Up @@ -1306,8 +1345,6 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {
if fi, err := os.Stat(db.WALPath()); err != nil {
return info, fmt.Errorf("open wal file: %w", err)
} else if info.offset > fi.Size() {
db.syncedToWALEnd = false

hdr, err := readWALHeader(db.WALPath())
if err != nil {
return info, fmt.Errorf("read wal header after wal truncation: %w", err)
Expand Down Expand Up @@ -1623,17 +1660,7 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) (sync
// This is the WALOffset + WALSize from the LTX we just created.
// Using this instead of file size prevents issue #997 where stale
// frames with old salt values cause perpetual checkpoint triggering.
finalOffset := info.offset + sz
db.lastSyncedWALOffset = finalOffset

// Track if we synced to the exact end of the WAL file.
// This is used by verify() to distinguish expected checkpoint truncation
// from unexpected external WAL modifications. See issue #927.
if walSize, err := db.walFileSize(); err == nil {
db.syncedToWALEnd = finalOffset == walSize
} else {
db.syncedToWALEnd = false
}
db.lastSyncedWALOffset = info.offset + sz

db.Logger.Debug("db sync", "status", "ok")

Expand Down
161 changes: 161 additions & 0 deletions db_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,15 @@ func TestDB_Verify_ForceSnapshotAfterCheckpointWALRestart(t *testing.T) {
expectedSalt1 := binary.BigEndian.Uint32(walHdr[16:])
expectedSalt2 := binary.BigEndian.Uint32(walHdr[20:])

// Write more data without syncing to create unsynced WAL frames.
// The verify() function will detect these by checking the LTX position
// against the actual WAL content.
if _, err := sqldb.Exec(`INSERT INTO t VALUES (2, 'unsynced')`); err != nil {
t.Fatal(err)
}

// Set forceNextSnapshot (simulates what checkpoint() does on WAL restart).
// With unsynced frames in the WAL, verify() should return snapshotting=true.
db.forceNextSnapshot = true

info, err = db.verify(ctx)
Expand Down Expand Up @@ -2066,3 +2074,156 @@ func TestDB_Sync_InitErrorMetrics(t *testing.T) {
t.Fatalf("litestream_sync_error_count=%v, want > %v (init error should be counted)", syncErrorValue, baselineErrors)
}
}

func TestDB_Verify_ForceSnapshotSkippedWhenSyncedToWALEnd(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "db")

db := NewDB(dbPath)
db.MonitorInterval = 0
db.Replica = NewReplica(db)
db.Replica.Client = &testReplicaClient{dir: t.TempDir()}
db.Replica.MonitorEnabled = false
if err := db.Open(); err != nil {
t.Fatal(err)
}
defer func() {
if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
}
}()

sqldb, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatal(err)
}
defer sqldb.Close()

if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data TEXT)`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, 'initial')`); err != nil {
t.Fatal(err)
}

ctx := context.Background()

if err := db.Sync(ctx); err != nil {
t.Fatal(err)
}

walHdr, err := readWALHeader(db.WALPath())
if err != nil {
t.Fatalf("read WAL header: %v", err)
}
expectedSalt1 := binary.BigEndian.Uint32(walHdr[16:])
expectedSalt2 := binary.BigEndian.Uint32(walHdr[20:])

// Set forceNextSnapshot (simulates what checkpoint() does on WAL restart).
// Since all frames are already synced (no writes after Sync()), verify()
// should detect this by checking the LTX position and return snapshotting=false.
db.forceNextSnapshot = true

info, err := db.verify(ctx)
if err != nil {
t.Fatalf("verify: %v", err)
}
if info.snapshotting {
t.Fatal("expected snapshotting=false when all frames already synced")
}
if info.offset != WALHeaderSize {
t.Fatalf("offset=%d, want %d", info.offset, WALHeaderSize)
}
if info.salt1 != expectedSalt1 || info.salt2 != expectedSalt2 {
t.Fatalf("salt mismatch: got (%d,%d), want (%d,%d)", info.salt1, info.salt2, expectedSalt1, expectedSalt2)
}
if db.forceNextSnapshot {
t.Fatal("forceNextSnapshot should be cleared")
}
}

func TestDB_CheckpointDoesNotCreateSnapshotWhenFullySynced(t *testing.T) {
dir := t.TempDir()
dbPath := filepath.Join(dir, "db")

db := NewDB(dbPath)
db.MonitorInterval = 0
db.Replica = NewReplica(db)
db.Replica.Client = &testReplicaClient{dir: t.TempDir()}
db.Replica.MonitorEnabled = false
if err := db.Open(); err != nil {
t.Fatal(err)
}
defer func() {
if err := db.Close(context.Background()); err != nil {
t.Fatal(err)
}
}()

sqldb, err := sql.Open("sqlite", dbPath)
if err != nil {
t.Fatal(err)
}
defer sqldb.Close()

if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data BLOB)`); err != nil {
t.Fatal(err)
}
if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, zeroblob(4096))`); err != nil {
t.Fatal(err)
}

ctx := context.Background()

if err := db.Sync(ctx); err != nil {
t.Fatal(err)
}

pos, err := db.Pos()
if err != nil {
t.Fatal(err)
}
preTXID := pos.TXID

if _, err := sqldb.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`); err != nil {
t.Fatal(err)
}

if _, err := sqldb.Exec(`INSERT INTO t VALUES (2, 'small')`); err != nil {
t.Fatal(err)
}

if err := db.Sync(ctx); err != nil {
t.Fatal(err)
}

pos, err = db.Pos()
if err != nil {
t.Fatal(err)
}

ltxPath := db.LTXPath(0, pos.TXID, pos.TXID)
fi, err := os.Stat(ltxPath)
if err != nil {
t.Fatalf("stat ltx: %v", err)
}

preLTXPath := db.LTXPath(0, preTXID, preTXID)
preFI, err := os.Stat(preLTXPath)
if err != nil {
t.Fatalf("stat pre-checkpoint ltx: %v", err)
}

if fi.Size() >= preFI.Size() {
t.Fatalf("post-checkpoint LTX (%d bytes) should be smaller than pre-checkpoint LTX (%d bytes); snapshot was not avoided",
fi.Size(), preFI.Size())
}

_ = preTXID
}
Loading