diff --git a/db.go b/db.go index 968ffe63f..bddd01c81 100644 --- a/db.go +++ b/db.go @@ -79,14 +79,6 @@ type DB struct { // otherwise create unnecessary LTX files. See issue #896. syncedSinceCheckpoint bool - // syncedToWALEnd tracks whether the last successful sync reached the - // exact end of the WAL file. When true, a subsequent WAL truncation - // (from checkpoint) is expected and should NOT trigger a full snapshot. - // This prevents issue #927 where every checkpoint triggers unnecessary - // full snapshots because verify() sees the old LTX position exceeds - // the new (truncated) WAL size. - syncedToWALEnd bool - // lastSyncedWALOffset tracks the logical end of the WAL content after // the last successful sync. This is the WALOffset + WALSize from the // last LTX file. Used for checkpoint threshold decisions instead of @@ -1270,14 +1262,61 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) { if db.forceNextSnapshot { db.forceNextSnapshot = false - hdr, err := readWALHeader(db.WALPath()) + newHdr, err := readWALHeader(db.WALPath()) if err != nil { return info, fmt.Errorf("read wal header for forced snapshot: %w", err) } info.offset = WALHeaderSize - info.salt1 = binary.BigEndian.Uint32(hdr[16:]) - info.salt2 = binary.BigEndian.Uint32(hdr[20:]) - info.reason = "WAL restarted during checkpoint" + info.salt1 = binary.BigEndian.Uint32(newHdr[16:]) + info.salt2 = binary.BigEndian.Uint32(newHdr[20:]) + + // Determine if all WAL frames were synced before the restart by reading + // the last LTX file's position and checking for unsynced frames. This + // approach is persistent and survives Litestream restarts. See issue #1165. + ltxPath := db.LTXPath(0, pos.TXID, pos.TXID) + ltxFile, err := os.Open(ltxPath) + if err != nil { + info.reason = "WAL restarted during checkpoint" + return info, nil + } + + dec := ltx.NewDecoder(ltxFile) + if err := dec.DecodeHeader(); err != nil { + _ = ltxFile.Close() + info.reason = "WAL restarted during checkpoint" + return info, nil + } + _ = ltxFile.Close() + + // Check if there are unsynced frames from the old WAL generation by + // reading at the position after the last synced frame. + ltxEndOffset := dec.Header().WALOffset + dec.Header().WALSize + oldSalt1 := dec.Header().WALSalt1 + oldSalt2 := dec.Header().WALSalt2 + + walFile, err := os.Open(db.WALPath()) + if err != nil { + info.reason = "WAL restarted during checkpoint" + return info, nil + } + + frmHdr := make([]byte, WALFrameHeaderSize) + n, _ := walFile.ReadAt(frmHdr, ltxEndOffset) + _ = walFile.Close() + + if n == WALFrameHeaderSize { + frameSalt1 := binary.BigEndian.Uint32(frmHdr[8:]) + frameSalt2 := binary.BigEndian.Uint32(frmHdr[12:]) + // If the frame has old salts, there are unsynced frames + if frameSalt1 == oldSalt1 && frameSalt2 == oldSalt2 { + info.reason = "WAL restarted with unsynced frames" + return info, nil + } + } + + // No unsynced frames - skip snapshot + info.snapshotting = false + info.reason = "WAL restarted but all frames already synced" return info, nil } @@ -1306,8 +1345,6 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) { if fi, err := os.Stat(db.WALPath()); err != nil { return info, fmt.Errorf("open wal file: %w", err) } else if info.offset > fi.Size() { - db.syncedToWALEnd = false - hdr, err := readWALHeader(db.WALPath()) if err != nil { return info, fmt.Errorf("read wal header after wal truncation: %w", err) @@ -1623,17 +1660,7 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) (sync // This is the WALOffset + WALSize from the LTX we just created. // Using this instead of file size prevents issue #997 where stale // frames with old salt values cause perpetual checkpoint triggering. - finalOffset := info.offset + sz - db.lastSyncedWALOffset = finalOffset - - // Track if we synced to the exact end of the WAL file. - // This is used by verify() to distinguish expected checkpoint truncation - // from unexpected external WAL modifications. See issue #927. - if walSize, err := db.walFileSize(); err == nil { - db.syncedToWALEnd = finalOffset == walSize - } else { - db.syncedToWALEnd = false - } + db.lastSyncedWALOffset = info.offset + sz db.Logger.Debug("db sync", "status", "ok") diff --git a/db_internal_test.go b/db_internal_test.go index 2fb8d34d1..52bc24555 100644 --- a/db_internal_test.go +++ b/db_internal_test.go @@ -1655,7 +1655,15 @@ func TestDB_Verify_ForceSnapshotAfterCheckpointWALRestart(t *testing.T) { expectedSalt1 := binary.BigEndian.Uint32(walHdr[16:]) expectedSalt2 := binary.BigEndian.Uint32(walHdr[20:]) + // Write more data without syncing to create unsynced WAL frames. + // The verify() function will detect these by checking the LTX position + // against the actual WAL content. + if _, err := sqldb.Exec(`INSERT INTO t VALUES (2, 'unsynced')`); err != nil { + t.Fatal(err) + } + // Set forceNextSnapshot (simulates what checkpoint() does on WAL restart). + // With unsynced frames in the WAL, verify() should return snapshotting=true. db.forceNextSnapshot = true info, err = db.verify(ctx) @@ -2066,3 +2074,156 @@ func TestDB_Sync_InitErrorMetrics(t *testing.T) { t.Fatalf("litestream_sync_error_count=%v, want > %v (init error should be counted)", syncErrorValue, baselineErrors) } } + +func TestDB_Verify_ForceSnapshotSkippedWhenSyncedToWALEnd(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "db") + + db := NewDB(dbPath) + db.MonitorInterval = 0 + db.Replica = NewReplica(db) + db.Replica.Client = &testReplicaClient{dir: t.TempDir()} + db.Replica.MonitorEnabled = false + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { + if err := db.Close(context.Background()); err != nil { + t.Fatal(err) + } + }() + + sqldb, err := sql.Open("sqlite", dbPath) + if err != nil { + t.Fatal(err) + } + defer sqldb.Close() + + if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil { + t.Fatal(err) + } + if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data TEXT)`); err != nil { + t.Fatal(err) + } + if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, 'initial')`); err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + if err := db.Sync(ctx); err != nil { + t.Fatal(err) + } + + walHdr, err := readWALHeader(db.WALPath()) + if err != nil { + t.Fatalf("read WAL header: %v", err) + } + expectedSalt1 := binary.BigEndian.Uint32(walHdr[16:]) + expectedSalt2 := binary.BigEndian.Uint32(walHdr[20:]) + + // Set forceNextSnapshot (simulates what checkpoint() does on WAL restart). + // Since all frames are already synced (no writes after Sync()), verify() + // should detect this by checking the LTX position and return snapshotting=false. + db.forceNextSnapshot = true + + info, err := db.verify(ctx) + if err != nil { + t.Fatalf("verify: %v", err) + } + if info.snapshotting { + t.Fatal("expected snapshotting=false when all frames already synced") + } + if info.offset != WALHeaderSize { + t.Fatalf("offset=%d, want %d", info.offset, WALHeaderSize) + } + if info.salt1 != expectedSalt1 || info.salt2 != expectedSalt2 { + t.Fatalf("salt mismatch: got (%d,%d), want (%d,%d)", info.salt1, info.salt2, expectedSalt1, expectedSalt2) + } + if db.forceNextSnapshot { + t.Fatal("forceNextSnapshot should be cleared") + } +} + +func TestDB_CheckpointDoesNotCreateSnapshotWhenFullySynced(t *testing.T) { + dir := t.TempDir() + dbPath := filepath.Join(dir, "db") + + db := NewDB(dbPath) + db.MonitorInterval = 0 + db.Replica = NewReplica(db) + db.Replica.Client = &testReplicaClient{dir: t.TempDir()} + db.Replica.MonitorEnabled = false + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { + if err := db.Close(context.Background()); err != nil { + t.Fatal(err) + } + }() + + sqldb, err := sql.Open("sqlite", dbPath) + if err != nil { + t.Fatal(err) + } + defer sqldb.Close() + + if _, err := sqldb.Exec(`PRAGMA journal_mode = wal;`); err != nil { + t.Fatal(err) + } + if _, err := sqldb.Exec(`CREATE TABLE t (id INT, data BLOB)`); err != nil { + t.Fatal(err) + } + if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, zeroblob(4096))`); err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + if err := db.Sync(ctx); err != nil { + t.Fatal(err) + } + + pos, err := db.Pos() + if err != nil { + t.Fatal(err) + } + preTXID := pos.TXID + + if _, err := sqldb.Exec(`PRAGMA wal_checkpoint(TRUNCATE)`); err != nil { + t.Fatal(err) + } + + if _, err := sqldb.Exec(`INSERT INTO t VALUES (2, 'small')`); err != nil { + t.Fatal(err) + } + + if err := db.Sync(ctx); err != nil { + t.Fatal(err) + } + + pos, err = db.Pos() + if err != nil { + t.Fatal(err) + } + + ltxPath := db.LTXPath(0, pos.TXID, pos.TXID) + fi, err := os.Stat(ltxPath) + if err != nil { + t.Fatalf("stat ltx: %v", err) + } + + preLTXPath := db.LTXPath(0, preTXID, preTXID) + preFI, err := os.Stat(preLTXPath) + if err != nil { + t.Fatalf("stat pre-checkpoint ltx: %v", err) + } + + if fi.Size() >= preFI.Size() { + t.Fatalf("post-checkpoint LTX (%d bytes) should be smaller than pre-checkpoint LTX (%d bytes); snapshot was not avoided", + fi.Size(), preFI.Size()) + } + + _ = preTXID +}