diff --git a/db.go b/db.go index 968ffe63f..b874ee061 100644 --- a/db.go +++ b/db.go @@ -1852,21 +1852,23 @@ func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error) { db.Logger.Debug("snapshot", "txid", pos.TXID.String()) - // Prevent internal checkpoints during sync. - db.chkMu.RLock() - defer db.chkMu.RUnlock() - - // TODO(ltx): Read database size from database header. - - fi, err := db.f.Stat() - if err != nil { - return pos, nil, err - } - commit := uint32(fi.Size() / int64(db.pageSize)) - // Execute encoding in a separate goroutine so the caller can initialize before reading. pr, pw := io.Pipe() go func() { + // Prevent internal checkpoints for the entire duration of page reading. + // This lock must be held inside the goroutine (not in the outer function) + // because the outer function returns before the goroutine finishes, + // which would release a deferred RUnlock while pages are still being read. + db.chkMu.RLock() + defer db.chkMu.RUnlock() + + fi, err := db.f.Stat() + if err != nil { + pw.CloseWithError(err) + return + } + commit := uint32(fi.Size() / int64(db.pageSize)) + walFile, err := os.Open(db.WALPath()) if err != nil { pw.CloseWithError(err) diff --git a/db_internal_test.go b/db_internal_test.go index 2fb8d34d1..cdcfe6e80 100644 --- a/db_internal_test.go +++ b/db_internal_test.go @@ -2066,3 +2066,273 @@ func TestDB_Sync_InitErrorMetrics(t *testing.T) { t.Fatalf("litestream_sync_error_count=%v, want > %v (init error should be counted)", syncErrorValue, baselineErrors) } } + +// TestSyncRestoreIntegrity exercises the full sync→checkpoint→restore→integrity +// flow with concurrent writes to verify no corruption is introduced. This +// reproduces the scenario from issue #1164 where users observed "database disk +// image is malformed" errors after restoring from replicas. +func TestSyncRestoreIntegrity(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + replicaDir := t.TempDir() + + db := NewDB(dbPath) + db.MonitorInterval = 0 + db.ShutdownSyncTimeout = 0 + db.MinCheckpointPageN = 50 + db.CheckpointInterval = 100 * time.Millisecond + db.Replica = NewReplica(db) + db.Replica.Client = &testReplicaClient{dir: replicaDir} + db.Replica.MonitorEnabled = false + db.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { _ = db.Close(context.Background()) }() + + sqldb, err := sql.Open("sqlite", dbPath) + if err != nil { + t.Fatal(err) + } + defer sqldb.Close() + if _, err := sqldb.Exec(`PRAGMA journal_mode = wal`); err != nil { + t.Fatal(err) + } + if _, err := sqldb.Exec(`PRAGMA busy_timeout = 5000`); err != nil { + t.Fatal(err) + } + + schema := ` + CREATE TABLE IF NOT EXISTS data ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + _uid TEXT NOT NULL, + _resource_version INTEGER NOT NULL, + _updated_at DATETIME NOT NULL, + name TEXT, + data_json BLOB, + is_active INTEGER, + UNIQUE (_uid, _resource_version) + ); + CREATE INDEX IF NOT EXISTS data_uid_idx ON data (_uid); + CREATE INDEX IF NOT EXISTS data_name_idx ON data (name); + ` + if _, err := sqldb.Exec(schema); err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + // Run concurrent writes and syncs for several iterations. + // This mirrors the pattern from the reproduction repo. + const iterations = 20 + for i := 0; i < iterations; i++ { + // Insert a batch of rows (simulating application writes). + for j := 0; j < 10; j++ { + uid := fmt.Sprintf("uid-%d-%d", i, j) + _, err := sqldb.ExecContext(ctx, + `INSERT INTO data (_uid, _resource_version, _updated_at, name, data_json, is_active) + VALUES (?, 1, datetime('now'), ?, ?, ?)`, + uid, fmt.Sprintf("item-%d-%d", i, j), + []byte(fmt.Sprintf(`{"key":"k%d","value":%d}`, j, j)), + j%2, + ) + if err != nil { + t.Fatal(err) + } + } + + // Run sync (this is what litestream does periodically). + if err := db.Sync(ctx); err != nil { + t.Fatalf("sync iteration %d: %v", i, err) + } + } + + // Ensure final sync captures everything. + if err := db.Sync(ctx); err != nil { + t.Fatalf("final sync: %v", err) + } + + // Close the application DB connection. + sqldb.Close() + + // Stop litestream. + if err := db.Close(ctx); err != nil { + t.Fatalf("close db: %v", err) + } + + // Restore from replica. + restorePath := filepath.Join(t.TempDir(), "restored.db") + restoreDB := NewDB(restorePath) + restoreDB.Replica = NewReplica(restoreDB) + restoreDB.Replica.Client = &testReplicaClient{dir: replicaDir} + restoreDB.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + if err := restoreDB.Replica.Restore(ctx, RestoreOptions{ + OutputPath: restorePath, + }); err != nil { + t.Fatalf("restore: %v", err) + } + + // Run integrity check on restored database. + restoredDB, err := sql.Open("sqlite", restorePath) + if err != nil { + t.Fatalf("open restored db: %v", err) + } + defer restoredDB.Close() + + rows, err := restoredDB.QueryContext(ctx, `PRAGMA integrity_check;`) + if err != nil { + t.Fatalf("integrity check: %v", err) + } + defer rows.Close() + + var results []string + for rows.Next() { + var result string + if err := rows.Scan(&result); err != nil { + t.Fatal(err) + } + results = append(results, result) + } + if err := rows.Err(); err != nil { + t.Fatal(err) + } + + if len(results) == 0 { + t.Fatal("integrity check returned no results") + } + if results[0] != "ok" { + t.Fatalf("integrity check failed on restored database:\n%s", fmt.Sprintf("%v", results)) + } + + // Verify data was restored correctly. + var count int + if err := restoredDB.QueryRowContext(ctx, `SELECT COUNT(*) FROM data`).Scan(&count); err != nil { + t.Fatalf("count data: %v", err) + } + expectedRows := iterations * 10 + if count != expectedRows { + t.Fatalf("restored row count=%d, want %d", count, expectedRows) + } +} + +// TestSyncRestoreIntegrity_WithCheckpoints is a stress variant that forces +// PASSIVE and TRUNCATE checkpoints between syncs to maximize the chance of +// hitting any TOCTOU race conditions. The read transaction (db.rtx) held by +// litestream during sync should prevent TRUNCATE checkpoints from the +// application's connection from completing. +func TestSyncRestoreIntegrity_WithCheckpoints(t *testing.T) { + if testing.Short() { + t.Skip("skipping in short mode") + } + + dir := t.TempDir() + dbPath := filepath.Join(dir, "test.db") + replicaDir := t.TempDir() + + db := NewDB(dbPath) + db.MonitorInterval = 0 + db.ShutdownSyncTimeout = 0 + db.MinCheckpointPageN = 20 + db.TruncatePageN = 200 + db.CheckpointInterval = 50 * time.Millisecond + db.Replica = NewReplica(db) + db.Replica.Client = &testReplicaClient{dir: replicaDir} + db.Replica.MonitorEnabled = false + db.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + if err := db.Open(); err != nil { + t.Fatal(err) + } + defer func() { _ = db.Close(context.Background()) }() + + sqldb, err := sql.Open("sqlite", dbPath) + if err != nil { + t.Fatal(err) + } + defer sqldb.Close() + if _, err := sqldb.Exec(`PRAGMA journal_mode = wal`); err != nil { + t.Fatal(err) + } + if _, err := sqldb.Exec(`PRAGMA busy_timeout = 5000`); err != nil { + t.Fatal(err) + } + if _, err := sqldb.Exec(`CREATE TABLE t (id INTEGER PRIMARY KEY, val TEXT, extra BLOB)`); err != nil { + t.Fatal(err) + } + + ctx := context.Background() + + // Write data in bursts with syncs and checkpoints interleaved. + for i := 0; i < 30; i++ { + // Burst of writes. + for j := 0; j < 20; j++ { + _, err := sqldb.ExecContext(ctx, + `INSERT INTO t (val, extra) VALUES (?, ?)`, + fmt.Sprintf("val-%d-%d", i, j), + bytes.Repeat([]byte{byte(i)}, 512), + ) + if err != nil { + t.Fatal(err) + } + } + + // Sync after each burst. + if err := db.Sync(ctx); err != nil { + t.Fatalf("sync %d: %v", i, err) + } + + // Periodically issue PASSIVE checkpoints from the app's connection + // to simulate the pattern from issue #1164. + if i%5 == 4 { + if _, err := sqldb.ExecContext(ctx, `PRAGMA wal_checkpoint(PASSIVE)`); err != nil { + t.Logf("passive checkpoint %d: %v", i, err) + } + } + } + + if err := db.Sync(ctx); err != nil { + t.Fatalf("final sync: %v", err) + } + + sqldb.Close() + if err := db.Close(ctx); err != nil { + t.Fatalf("close: %v", err) + } + + // Restore and verify. + restorePath := filepath.Join(t.TempDir(), "restored.db") + restoreDB := NewDB(restorePath) + restoreDB.Replica = NewReplica(restoreDB) + restoreDB.Replica.Client = &testReplicaClient{dir: replicaDir} + restoreDB.Logger = slog.New(slog.NewTextHandler(io.Discard, nil)) + if err := restoreDB.Replica.Restore(ctx, RestoreOptions{ + OutputPath: restorePath, + }); err != nil { + t.Fatalf("restore: %v", err) + } + + restoredDB, err := sql.Open("sqlite", restorePath) + if err != nil { + t.Fatal(err) + } + defer restoredDB.Close() + + var result string + if err := restoredDB.QueryRowContext(ctx, `PRAGMA integrity_check`).Scan(&result); err != nil { + t.Fatal(err) + } + if result != "ok" { + t.Fatalf("integrity check failed: %s", result) + } + + var count int + if err := restoredDB.QueryRowContext(ctx, `SELECT COUNT(*) FROM t`).Scan(&count); err != nil { + t.Fatal(err) + } + if count != 30*20 { + t.Fatalf("row count=%d, want %d", count, 30*20) + } +} diff --git a/replica.go b/replica.go index 1c028d302..198a206e6 100644 --- a/replica.go +++ b/replica.go @@ -884,6 +884,9 @@ func (r *Replica) applyLTXFile(ctx context.Context, f *os.File, info *ltx.FileIn } if hdr.Commit > 0 { + if err := f.Sync(); err != nil { + return fmt.Errorf("sync before truncate: %w", err) + } newSize := int64(hdr.Commit) * int64(pageSize) if err := f.Truncate(newSize); err != nil { return fmt.Errorf("truncate: %w", err) diff --git a/replica_internal_test.go b/replica_internal_test.go index 9ef6f5e20..f94f873b2 100644 --- a/replica_internal_test.go +++ b/replica_internal_test.go @@ -3,6 +3,7 @@ package litestream import ( "bytes" "context" + "encoding/binary" "fmt" "io" "os" @@ -277,3 +278,161 @@ func mustCreateWritableDBFile(tb testing.TB) *os.File { func ltxFixtureKey(level int, minTXID, maxTXID ltx.TXID) string { return fmt.Sprintf("%d:%s:%s", level, minTXID, maxTXID) } + +// TestApplyLTXFile_TruncatesAfterWrite verifies that applyLTXFile correctly +// truncates the database file to match the commit size in the LTX header. +// This tests the fix from commit 9b7db29 which adds f.Sync() before truncate. +func TestApplyLTXFile_TruncatesAfterWrite(t *testing.T) { + const pageSize = 4096 + + info := <x.FileInfo{Level: 0, MinTXID: 10, MaxTXID: 10} + + var buf bytes.Buffer + enc, err := ltx.NewEncoder(&buf) + if err != nil { + t.Fatal(err) + } + hdr := ltx.Header{ + Version: ltx.Version, + Flags: ltx.HeaderFlagNoChecksum, + PageSize: pageSize, + Commit: 2, // 2 pages = 8192 bytes + MinTXID: info.MinTXID, + MaxTXID: info.MaxTXID, + Timestamp: time.Now().UnixMilli(), + } + if err := enc.EncodeHeader(hdr); err != nil { + t.Fatal(err) + } + // Write page 1 with SQLite-like header bytes. + page1 := make([]byte, pageSize) + copy(page1, []byte("SQLite format 3\000")) + page1[18], page1[19] = 0x01, 0x01 + binary.BigEndian.PutUint16(page1[16:18], pageSize) + if err := enc.EncodePage(ltx.PageHeader{Pgno: 1}, page1); err != nil { + t.Fatal(err) + } + // Write page 2. + page2 := bytes.Repeat([]byte{0xAB}, pageSize) + if err := enc.EncodePage(ltx.PageHeader{Pgno: 2}, page2); err != nil { + t.Fatal(err) + } + if err := enc.Close(); err != nil { + t.Fatal(err) + } + + ltxData := buf.Bytes() + client := &followTestReplicaClient{} + client.OpenLTXFileFunc = func(_ context.Context, level int, minTXID, maxTXID ltx.TXID, _, _ int64) (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(ltxData)), nil + } + + r := NewReplicaWithClient(nil, client) + + // Create a file that's larger than the commit size (simulating a DB + // that will be truncated down). + f := mustCreateWritableDBFile(t) // 128KB + defer func() { _ = f.Close() }() + + if err := r.applyLTXFile(context.Background(), f, info, pageSize); err != nil { + t.Fatalf("applyLTXFile: %v", err) + } + + // Verify the file was truncated to Commit * pageSize = 8192 bytes. + fi, err := f.Stat() + if err != nil { + t.Fatal(err) + } + expectedSize := int64(2) * int64(pageSize) + if fi.Size() != expectedSize { + t.Fatalf("file size=%d, want %d (truncate failed)", fi.Size(), expectedSize) + } + + // Verify page 2 data was written correctly. + readBuf := make([]byte, pageSize) + if _, err := f.ReadAt(readBuf, int64(pageSize)); err != nil { + t.Fatal(err) + } + if !bytes.Equal(readBuf, page2) { + t.Fatal("page 2 data mismatch after applyLTXFile") + } +} + +// TestApplyLTXFile_MultiplePages verifies that applying an LTX file with +// multiple pages produces a valid, correctly sized database. +func TestApplyLTXFile_MultiplePages(t *testing.T) { + const pageSize = 4096 + const numPages = 5 + + info := <x.FileInfo{Level: 0, MinTXID: 1, MaxTXID: 1} + + var buf bytes.Buffer + enc, err := ltx.NewEncoder(&buf) + if err != nil { + t.Fatal(err) + } + hdr := ltx.Header{ + Version: ltx.Version, + Flags: ltx.HeaderFlagNoChecksum, + PageSize: pageSize, + Commit: numPages, + MinTXID: info.MinTXID, + MaxTXID: info.MaxTXID, + Timestamp: time.Now().UnixMilli(), + } + if err := enc.EncodeHeader(hdr); err != nil { + t.Fatal(err) + } + + // Write pages with distinct fill patterns. + pages := make([][]byte, numPages) + for i := uint32(0); i < numPages; i++ { + pages[i] = bytes.Repeat([]byte{byte(0x10 + i)}, pageSize) + if i == 0 { + copy(pages[i], []byte("SQLite format 3\000")) + pages[i][18], pages[i][19] = 0x01, 0x01 + } + if err := enc.EncodePage(ltx.PageHeader{Pgno: i + 1}, pages[i]); err != nil { + t.Fatal(err) + } + } + if err := enc.Close(); err != nil { + t.Fatal(err) + } + + ltxData := buf.Bytes() + client := &followTestReplicaClient{} + client.OpenLTXFileFunc = func(_ context.Context, _ int, _, _ ltx.TXID, _, _ int64) (io.ReadCloser, error) { + return io.NopCloser(bytes.NewReader(ltxData)), nil + } + + r := NewReplicaWithClient(nil, client) + f := mustCreateWritableDBFile(t) + defer func() { _ = f.Close() }() + + if err := r.applyLTXFile(context.Background(), f, info, pageSize); err != nil { + t.Fatalf("applyLTXFile: %v", err) + } + + fi, err := f.Stat() + if err != nil { + t.Fatal(err) + } + expectedSize := int64(numPages) * int64(pageSize) + if fi.Size() != expectedSize { + t.Fatalf("file size=%d, want %d", fi.Size(), expectedSize) + } + + // Verify each page's content. + readBuf := make([]byte, pageSize) + for i := uint32(0); i < numPages; i++ { + if _, err := f.ReadAt(readBuf, int64(i)*int64(pageSize)); err != nil { + t.Fatalf("read page %d: %v", i+1, err) + } + // Page 1 has modified header bytes (journal mode + schema counter), + // so only check non-header pages exactly. + if i > 0 && !bytes.Equal(readBuf, pages[i]) { + t.Fatalf("page %d data mismatch", i+1) + } + } +} diff --git a/tests/integration/comprehensive_soak_test.go b/tests/integration/comprehensive_soak_test.go index bbf563a6c..ce8efb157 100644 --- a/tests/integration/comprehensive_soak_test.go +++ b/tests/integration/comprehensive_soak_test.go @@ -226,9 +226,9 @@ func TestComprehensiveSoak(t *testing.T) { testPassed := true issues := []string{} - if criticalErrors > 0 { + if len(errors) > 0 { testPassed = false - issues = append(issues, fmt.Sprintf("Critical errors detected: %d", criticalErrors)) + issues = append(issues, fmt.Sprintf("Critical errors detected: %d", len(errors))) } if analysis.FinalFileCount == 0 { diff --git a/tests/integration/minio_soak_test.go b/tests/integration/minio_soak_test.go index ee78d7add..bab82808c 100644 --- a/tests/integration/minio_soak_test.go +++ b/tests/integration/minio_soak_test.go @@ -267,9 +267,9 @@ func TestMinIOSoak(t *testing.T) { testPassed := true issues := []string{} - if criticalErrors > 0 { + if len(errors) > 0 { testPassed = false - issues = append(issues, fmt.Sprintf("Critical errors detected: %d", criticalErrors)) + issues = append(issues, fmt.Sprintf("Critical errors detected: %d", len(errors))) } if finalObjects == 0 { diff --git a/tests/integration/soak_replicate_restore_test.go b/tests/integration/soak_replicate_restore_test.go new file mode 100644 index 000000000..d60bec65c --- /dev/null +++ b/tests/integration/soak_replicate_restore_test.go @@ -0,0 +1,373 @@ +//go:build integration && soak && docker + +package integration + +import ( + "context" + "database/sql" + "fmt" + "math" + "os" + "path/filepath" + "sort" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +// TestSoakReplicateRestore reproduces issue #1164: intermittent database corruption +// ("wrong # of entries in index") after restoring from S3-compatible replicas. +// +// This test mirrors fuchstim's exact setup: +// - SQLite DB in WAL mode with tables + indexes +// - Litestream replication to MinIO (S3) +// - ~100 rows/sec concurrent writes +// - Periodic stop→restore→integrity_check cycles +// +// Default duration: 5 minutes (override with SOAK_DURATION env var) +// Can be shortened with: go test -test.short (runs for 1 minute) +// +// Requirements: +// - Docker must be running +// - Binaries must be built: go build -o bin/litestream ./cmd/litestream +func TestSoakReplicateRestore(t *testing.T) { + RequireBinaries(t) + RequireDocker(t) + + duration := parseSoakDuration(t, 5*time.Minute) + if testing.Short() { + duration = 1 * time.Minute + } + restoreInterval := 30 * time.Second + writeRate := 100 + + t.Logf("================================================") + t.Logf("Issue #1164 Reproduction: Replicate+Restore Soak") + t.Logf("================================================") + t.Logf("Duration: %v", duration) + t.Logf("Restore interval: %v", restoreInterval) + t.Logf("Write rate: %d rows/sec", writeRate) + t.Logf("Start time: %s", time.Now().Format(time.RFC3339)) + t.Log("") + + containerID, endpoint, dataVolume := StartMinIOContainer(t) + defer StopMinIOContainer(t, containerID, dataVolume) + + bucket := "litestream-test" + CreateMinIOBucket(t, containerID, bucket) + + db := SetupTestDB(t, "replicate-restore") + defer db.Cleanup() + + if err := db.Create(); err != nil { + t.Fatalf("create database: %v", err) + } + + sqlDB, err := sql.Open("sqlite3", db.Path) + if err != nil { + t.Fatalf("open database: %v", err) + } + defer sqlDB.Close() + + if _, err := sqlDB.Exec("PRAGMA journal_mode=WAL"); err != nil { + t.Fatalf("set WAL mode: %v", err) + } + + // Create schema matching fuchstim's setup: table with indexes + if _, err := sqlDB.Exec(` + CREATE TABLE IF NOT EXISTS resources ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + _uid TEXT NOT NULL, + _resource_version INTEGER NOT NULL DEFAULT 0, + name TEXT NOT NULL, + data TEXT, + created_at INTEGER NOT NULL + ); + CREATE UNIQUE INDEX IF NOT EXISTS idx_resources_uid ON resources(_uid); + CREATE INDEX IF NOT EXISTS idx_resources_rv ON resources(_resource_version); + CREATE INDEX IF NOT EXISTS idx_resources_name ON resources(name); + `); err != nil { + t.Fatalf("create schema: %v", err) + } + + s3Path := fmt.Sprintf("replicate-restore-%d", time.Now().Unix()) + s3URL := fmt.Sprintf("s3://%s/%s", bucket, s3Path) + db.ReplicaURL = s3URL + + configPath := writeReplicateRestoreConfig(t, db.Path, s3URL, endpoint) + db.ConfigPath = configPath + + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("start litestream: %v", err) + } + t.Logf("Litestream running (PID: %d)", db.LitestreamPID) + + // Wait for initial sync + time.Sleep(3 * time.Second) + + ctx, cancel := context.WithTimeout(context.Background(), duration) + defer cancel() + + // Performance tracking + var ( + latencies latencyTracker + totalWrites atomic.Int64 + writeErrs atomic.Int64 + ) + + // Writer goroutine: ~writeRate rows/sec + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(time.Second / time.Duration(writeRate)) + defer ticker.Stop() + + rv := int64(0) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + rv++ + start := time.Now() + _, err := sqlDB.ExecContext(ctx, ` + INSERT INTO resources (_uid, _resource_version, name, data, created_at) + VALUES (?, ?, ?, ?, ?)`, + fmt.Sprintf("uid-%d-%d", time.Now().UnixNano(), rv), + rv, + fmt.Sprintf("resource-%d", rv%1000), + fmt.Sprintf("payload data for resource version %d with some padding to simulate real workload size", rv), + time.Now().Unix(), + ) + elapsed := time.Since(start) + if err != nil { + if ctx.Err() != nil { + return + } + writeErrs.Add(1) + continue + } + totalWrites.Add(1) + latencies.record(elapsed) + } + } + }() + + // Periodic restore+integrity check loop + var ( + restoreCount int + corruptionCount int + restoreErrors []string + ) + + restoreTicker := time.NewTicker(restoreInterval) + defer restoreTicker.Stop() + + t.Log("Running replicate+restore cycles...") + t.Log("") + + startTime := time.Now() + +loop: + for { + select { + case <-ctx.Done(): + break loop + case <-restoreTicker.C: + restoreCount++ + elapsed := time.Since(startTime) + sourceRows := countRowsSafe(sqlDB) + + t.Logf("[%v] Restore cycle #%d (source rows: %d, writes: %d, write errors: %d)", + elapsed.Round(time.Second), restoreCount, sourceRows, totalWrites.Load(), writeErrs.Load()) + + // Stop litestream for clean restore + if err := db.StopLitestream(); err != nil { + t.Logf(" Warning: stop litestream: %v", err) + } + time.Sleep(1 * time.Second) + + // Restore to new path + restoredPath := filepath.Join(db.TempDir, fmt.Sprintf("restored-%d.db", restoreCount)) + if err := db.Restore(restoredPath); err != nil { + restoreErrors = append(restoreErrors, fmt.Sprintf("cycle %d: restore: %v", restoreCount, err)) + t.Logf(" RESTORE FAILED: %v", err) + // Restart litestream and continue + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("restart litestream after failed restore: %v", err) + } + continue + } + + // Integrity check on restored DB + restoredDB, err := sql.Open("sqlite3", restoredPath) + if err != nil { + restoreErrors = append(restoreErrors, fmt.Sprintf("cycle %d: open restored: %v", restoreCount, err)) + t.Logf(" OPEN FAILED: %v", err) + } else { + var result string + if err := restoredDB.QueryRow("PRAGMA integrity_check").Scan(&result); err != nil { + restoreErrors = append(restoreErrors, fmt.Sprintf("cycle %d: integrity_check query: %v", restoreCount, err)) + t.Logf(" INTEGRITY CHECK QUERY FAILED: %v", err) + } else if result != "ok" { + corruptionCount++ + restoreErrors = append(restoreErrors, fmt.Sprintf("cycle %d: CORRUPTION: %s", restoreCount, result)) + t.Errorf(" CORRUPTION DETECTED: %s", result) + } else { + // Verify row count is reasonable (restored may lag behind source) + var restoredRows int + restoredDB.QueryRow("SELECT COUNT(*) FROM resources").Scan(&restoredRows) + t.Logf(" OK: integrity=ok, restored_rows=%d", restoredRows) + } + restoredDB.Close() + } + + // Clean up restored DB + os.Remove(restoredPath) + os.Remove(restoredPath + "-wal") + os.Remove(restoredPath + "-shm") + + // Restart litestream + if err := db.StartLitestreamWithConfig(configPath); err != nil { + t.Fatalf("restart litestream: %v", err) + } + time.Sleep(2 * time.Second) + } + } + + cancel() + wg.Wait() + + // Final report + t.Log("") + t.Log("================================================") + t.Log("Results") + t.Log("================================================") + t.Logf("Duration: %v", time.Since(startTime).Round(time.Second)) + t.Logf("Total writes: %d", totalWrites.Load()) + t.Logf("Write errors: %d", writeErrs.Load()) + t.Logf("Restore cycles: %d", restoreCount) + t.Logf("Corruptions: %d", corruptionCount) + + stats := latencies.stats() + t.Logf("Write latency P50: %v", stats.p50) + t.Logf("Write latency P95: %v", stats.p95) + t.Logf("Write latency P99: %v", stats.p99) + t.Logf("Write latency max: %v", stats.max) + + if len(restoreErrors) > 0 { + t.Log("") + t.Log("Restore errors:") + for _, e := range restoreErrors { + t.Logf(" %s", e) + } + } + + t.Log("================================================") + + if corruptionCount > 0 { + t.Fatalf("FAILED: %d corruption(s) detected in %d restore cycles", corruptionCount, restoreCount) + } + + if stats.p99 > 500*time.Millisecond { + t.Errorf("P99 write latency %v exceeds 500ms threshold", stats.p99) + } + + t.Log("PASSED: no corruption detected") +} + +func writeReplicateRestoreConfig(t *testing.T, dbPath, s3URL, endpoint string) string { + t.Helper() + configPath := filepath.Join(filepath.Dir(dbPath), "litestream.yml") + config := fmt.Sprintf(`access-key-id: minioadmin +secret-access-key: minioadmin + +dbs: + - path: %s + checkpoint-interval: 1m + min-checkpoint-page-count: 100 + replicas: + - url: %s + endpoint: %s + region: us-east-1 + force-path-style: true + skip-verify: true + sync-interval: 1s + snapshot-interval: 30s +`, filepath.ToSlash(dbPath), s3URL, endpoint) + if err := os.WriteFile(configPath, []byte(config), 0644); err != nil { + t.Fatalf("write config: %v", err) + } + return configPath +} + +func parseSoakDuration(t *testing.T, defaultDuration time.Duration) time.Duration { + t.Helper() + if v := os.Getenv("SOAK_DURATION"); v != "" { + if d, err := time.ParseDuration(v); err == nil { + return d + } + if mins, err := strconv.Atoi(v); err == nil { + return time.Duration(mins) * time.Minute + } + t.Logf("Warning: invalid SOAK_DURATION %q, using default %v", v, defaultDuration) + } + return defaultDuration +} + +func countRowsSafe(db *sql.DB) int { + var count int + db.QueryRow("SELECT COUNT(*) FROM resources").Scan(&count) + return count +} + +type latencyTracker struct { + mu sync.Mutex + samples []time.Duration +} + +func (lt *latencyTracker) record(d time.Duration) { + lt.mu.Lock() + defer lt.mu.Unlock() + lt.samples = append(lt.samples, d) +} + +type latencyStats struct { + p50, p95, p99, max time.Duration +} + +func (lt *latencyTracker) stats() latencyStats { + lt.mu.Lock() + defer lt.mu.Unlock() + + if len(lt.samples) == 0 { + return latencyStats{} + } + + sorted := make([]time.Duration, len(lt.samples)) + copy(sorted, lt.samples) + sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) + + percentile := func(p float64) time.Duration { + idx := int(math.Ceil(p/100*float64(len(sorted)))) - 1 + if idx < 0 { + idx = 0 + } + if idx >= len(sorted) { + idx = len(sorted) - 1 + } + return sorted[idx] + } + + return latencyStats{ + p50: percentile(50), + p95: percentile(95), + p99: percentile(99), + max: sorted[len(sorted)-1], + } +}