Skip to content

Commit 8905efd

Browse files
committed
fix(sync): buffer WAL pages during read to prevent checkpoint race
PageMap() previously returned a map of page numbers to WAL file offsets, then writeLTXFromWAL() and writeLTXFromDB() re-read page data at those offsets. Between these two reads, a concurrent SQLite checkpoint could rewrite or truncate the WAL, causing garbled pages to be captured into LTX files and uploaded to replicas. Change PageMap() to return buffered page data (map[uint32][]byte) instead of offsets (map[uint32]int64). Pages are now copied during the initial checksum-verified read, eliminating the re-read window entirely. Fixes #1164
1 parent 9b7db29 commit 8905efd

File tree

3 files changed

+316
-53
lines changed

3 files changed

+316
-53
lines changed

db.go

Lines changed: 12 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1578,11 +1578,11 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) (sync
15781578
// If we need a full snapshot, then copy from the database & WAL.
15791579
// Otherwise, just copy incrementally from the WAL.
15801580
if info.snapshotting {
1581-
if err := db.writeLTXFromDB(ctx, enc, walFile, commit, pageMap); err != nil {
1581+
if err := db.writeLTXFromDB(ctx, enc, commit, pageMap); err != nil {
15821582
return false, fmt.Errorf("write ltx from db: %w", err)
15831583
}
15841584
} else {
1585-
if err := db.writeLTXFromWAL(ctx, enc, walFile, pageMap); err != nil {
1585+
if err := db.writeLTXFromWAL(ctx, enc, pageMap); err != nil {
15861586
return false, fmt.Errorf("write ltx from wal: %w", err)
15871587
}
15881588
}
@@ -1640,7 +1640,7 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) (sync
16401640
return true, nil
16411641
}
16421642

1643-
func (db *DB) writeLTXFromDB(ctx context.Context, enc *ltx.Encoder, walFile *os.File, commit uint32, pageMap map[uint32]int64) error {
1643+
func (db *DB) writeLTXFromDB(ctx context.Context, enc *ltx.Encoder, commit uint32, pageMap map[uint32][]byte) error {
16441644
lockPgno := ltx.LockPgno(uint32(db.pageSize))
16451645
data := make([]byte, db.pageSize)
16461646

@@ -1649,24 +1649,17 @@ func (db *DB) writeLTXFromDB(ctx context.Context, enc *ltx.Encoder, walFile *os.
16491649
continue
16501650
}
16511651

1652-
// Check if the caller has canceled during processing.
16531652
select {
16541653
case <-ctx.Done():
16551654
return context.Cause(ctx)
16561655
default:
16571656
}
16581657

1659-
// If page exists in the WAL, read from there.
1660-
if offset, ok := pageMap[pgno]; ok {
1661-
db.Logger.Log(ctx, internal.LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "offset", offset, "pgno", pgno, "type", "db+wal")
1658+
// If page exists in the WAL, use the buffered data directly.
1659+
if pageData, ok := pageMap[pgno]; ok {
1660+
db.Logger.Log(ctx, internal.LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "pgno", pgno, "type", "db+wal")
16621661

1663-
if n, err := walFile.ReadAt(data, offset+WALFrameHeaderSize); err != nil {
1664-
return fmt.Errorf("read page %d @ %d: %w", pgno, offset, err)
1665-
} else if n != len(data) {
1666-
return fmt.Errorf("short read page %d @ %d", pgno, offset)
1667-
}
1668-
1669-
if err := enc.EncodePage(ltx.PageHeader{Pgno: pgno}, data); err != nil {
1662+
if err := enc.EncodePage(ltx.PageHeader{Pgno: pgno}, pageData); err != nil {
16701663
return fmt.Errorf("encode ltx frame (pgno=%d): %w", pgno, err)
16711664
}
16721665
continue
@@ -1675,7 +1668,6 @@ func (db *DB) writeLTXFromDB(ctx context.Context, enc *ltx.Encoder, walFile *os.
16751668
offset := int64(pgno-1) * int64(db.pageSize)
16761669
db.Logger.Log(ctx, internal.LevelTrace, "encode page from database", "offset", offset, "pgno", pgno)
16771670

1678-
// Otherwise read directly from the database file.
16791671
if _, err := db.f.ReadAt(data, offset); err != nil {
16801672
return fmt.Errorf("read database page %d: %w", pgno, err)
16811673
}
@@ -1687,29 +1679,19 @@ func (db *DB) writeLTXFromDB(ctx context.Context, enc *ltx.Encoder, walFile *os.
16871679
return nil
16881680
}
16891681

1690-
func (db *DB) writeLTXFromWAL(ctx context.Context, enc *ltx.Encoder, walFile *os.File, pageMap map[uint32]int64) error {
1691-
// Create an ordered list of page numbers since the LTX encoder requires it.
1682+
func (db *DB) writeLTXFromWAL(ctx context.Context, enc *ltx.Encoder, pageMap map[uint32][]byte) error {
16921683
pgnos := make([]uint32, 0, len(pageMap))
16931684
for pgno := range pageMap {
16941685
pgnos = append(pgnos, pgno)
16951686
}
16961687
slices.Sort(pgnos)
16971688

1698-
data := make([]byte, db.pageSize)
16991689
for _, pgno := range pgnos {
1700-
offset := pageMap[pgno]
1690+
pageData := pageMap[pgno]
17011691

1702-
db.Logger.Log(ctx, internal.LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "offset", offset, "pgno", pgno, "type", "walonly")
1692+
db.Logger.Log(ctx, internal.LevelTrace, "encode page from wal", "txid", enc.Header().MinTXID, "pgno", pgno, "type", "walonly")
17031693

1704-
// Read source page using page map.
1705-
if n, err := walFile.ReadAt(data, offset+WALFrameHeaderSize); err != nil {
1706-
return fmt.Errorf("read page %d @ %d: %w", pgno, offset, err)
1707-
} else if n != len(data) {
1708-
return fmt.Errorf("short read page %d @ %d", pgno, offset)
1709-
}
1710-
1711-
// Write page to LTX encoder.
1712-
if err := enc.EncodePage(ltx.PageHeader{Pgno: pgno}, data); err != nil {
1694+
if err := enc.EncodePage(ltx.PageHeader{Pgno: pgno}, pageData); err != nil {
17131695
return fmt.Errorf("encode ltx frame (pgno=%d): %w", pgno, err)
17141696
}
17151697
}
@@ -1925,7 +1907,7 @@ func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error) {
19251907
return
19261908
}
19271909

1928-
if err := db.writeLTXFromDB(ctx, enc, walFile, commit, pageMap); err != nil {
1910+
if err := db.writeLTXFromDB(ctx, enc, commit, pageMap); err != nil {
19291911
pw.CloseWithError(fmt.Errorf("write snapshot ltx: %w", err))
19301912
return
19311913
}

db_internal_test.go

Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"log/slog"
1212
"os"
1313
"path/filepath"
14+
"slices"
1415
"testing"
1516
"time"
1617

@@ -2066,3 +2067,286 @@ func TestDB_Sync_InitErrorMetrics(t *testing.T) {
20662067
t.Fatalf("litestream_sync_error_count=%v, want > %v (init error should be counted)", syncErrorValue, baselineErrors)
20672068
}
20682069
}
2070+
2071+
func TestPageMap_BuffersPageData(t *testing.T) {
2072+
dir := t.TempDir()
2073+
dbPath := filepath.Join(dir, "test.db")
2074+
2075+
sqldb, err := sql.Open("sqlite", dbPath)
2076+
if err != nil {
2077+
t.Fatal(err)
2078+
}
2079+
defer sqldb.Close()
2080+
2081+
if _, err := sqldb.Exec(`PRAGMA journal_mode = wal`); err != nil {
2082+
t.Fatal(err)
2083+
}
2084+
if _, err := sqldb.Exec(`CREATE TABLE t (id INTEGER PRIMARY KEY, data TEXT)`); err != nil {
2085+
t.Fatal(err)
2086+
}
2087+
if _, err := sqldb.Exec(`INSERT INTO t VALUES (1, 'hello')`); err != nil {
2088+
t.Fatal(err)
2089+
}
2090+
2091+
walFile, err := os.Open(dbPath + "-wal")
2092+
if err != nil {
2093+
t.Fatal(err)
2094+
}
2095+
defer walFile.Close()
2096+
2097+
rd, err := NewWALReader(walFile, slog.Default())
2098+
if err != nil {
2099+
t.Fatal(err)
2100+
}
2101+
2102+
pageMap, _, commit, err := rd.PageMap(context.Background())
2103+
if err != nil {
2104+
t.Fatal(err)
2105+
}
2106+
2107+
if commit == 0 {
2108+
t.Fatal("expected non-zero commit")
2109+
}
2110+
if len(pageMap) == 0 {
2111+
t.Fatal("expected non-empty page map")
2112+
}
2113+
2114+
for pgno, data := range pageMap {
2115+
if len(data) != int(rd.PageSize()) {
2116+
t.Fatalf("page %d: got %d bytes, want %d", pgno, len(data), rd.PageSize())
2117+
}
2118+
}
2119+
2120+
// Page 1 should contain the SQLite header magic.
2121+
if page1, ok := pageMap[1]; ok {
2122+
if !bytes.HasPrefix(page1, []byte("SQLite format 3\000")) {
2123+
t.Fatal("page 1 does not start with SQLite header magic")
2124+
}
2125+
}
2126+
}
2127+
2128+
// TestSync_WALRaceCondition reproduces the TOCTOU race condition from issue #1164
2129+
// using the same schema and access patterns as the reproduction repo
2130+
// (github.com/fuchstim/litestream-corruption-reproduction).
2131+
//
2132+
// The test demonstrates:
2133+
// 1. The OLD offset-based approach: PageMap returns offsets, a simulated checkpoint
2134+
// overwrites the WAL, and re-reading from those offsets produces garbage data.
2135+
// 2. The NEW buffered approach: PageMap returns buffered []byte data that survives
2136+
// WAL modification, producing a valid LTX file even after the WAL is corrupted.
2137+
func TestSync_WALRaceCondition(t *testing.T) {
2138+
dir := t.TempDir()
2139+
dbPath := filepath.Join(dir, "test.db")
2140+
2141+
sqldb, err := sql.Open("sqlite", dbPath)
2142+
if err != nil {
2143+
t.Fatal(err)
2144+
}
2145+
defer sqldb.Close()
2146+
2147+
if _, err := sqldb.Exec(`PRAGMA journal_mode = wal`); err != nil {
2148+
t.Fatal(err)
2149+
}
2150+
2151+
// Use the same schema as the reproduction repo: a table with multiple
2152+
// indexes and triggers that cause many WAL pages per transaction.
2153+
schema := `
2154+
CREATE TABLE IF NOT EXISTS "data" (
2155+
"ROWID" INTEGER PRIMARY KEY AUTOINCREMENT,
2156+
"_uid" TEXT NOT NULL,
2157+
"_resource_version" INTEGER NOT NULL,
2158+
"_updated_at" DATETIME NOT NULL,
2159+
"_ingested_at" DATETIME NOT NULL,
2160+
"_deleted_at" DATETIME,
2161+
"name" TEXT,
2162+
"data_json" BLOB,
2163+
"is_active" INTEGER,
2164+
UNIQUE ("_uid", "_resource_version")
2165+
);
2166+
CREATE INDEX IF NOT EXISTS "data__uid_idx" ON "data" ("_uid");
2167+
CREATE INDEX IF NOT EXISTS "data__resource_version_idx" ON "data" ("_resource_version");
2168+
CREATE INDEX IF NOT EXISTS "data__deleted_at_idx" ON "data" ("_deleted_at");
2169+
CREATE INDEX IF NOT EXISTS "data_name_idx" ON "data" ("name");
2170+
`
2171+
if _, err := sqldb.Exec(schema); err != nil {
2172+
t.Fatal(err)
2173+
}
2174+
2175+
// Insert enough rows to generate many WAL pages across table + index B-trees.
2176+
for i := 0; i < 200; i++ {
2177+
_, err := sqldb.Exec(
2178+
`INSERT INTO "data" ("_uid", "_resource_version", "_updated_at", "_ingested_at", "name", "data_json", "is_active")
2179+
VALUES (?, 1, datetime('now'), datetime('now'), ?, ?, ?)`,
2180+
fmt.Sprintf("uid-%d", i),
2181+
fmt.Sprintf("item-%d", i),
2182+
[]byte(fmt.Sprintf(`{"key":"k%d","value":%d}`, i, i)),
2183+
i%2,
2184+
)
2185+
if err != nil {
2186+
t.Fatal(err)
2187+
}
2188+
}
2189+
2190+
walFile, err := os.Open(dbPath + "-wal")
2191+
if err != nil {
2192+
t.Fatal(err)
2193+
}
2194+
defer walFile.Close()
2195+
2196+
rd, err := NewWALReader(walFile, slog.Default())
2197+
if err != nil {
2198+
t.Fatal(err)
2199+
}
2200+
pageSize := rd.PageSize()
2201+
2202+
// --- NEW approach (buffered): PageMap returns []byte data ---
2203+
pageMap, maxOffset, commit, err := rd.PageMap(context.Background())
2204+
if err != nil {
2205+
t.Fatal(err)
2206+
}
2207+
if commit == 0 {
2208+
t.Fatal("expected non-zero commit")
2209+
}
2210+
2211+
t.Logf("buffered PageMap: %d pages, commit=%d, maxOffset=%d", len(pageMap), commit, maxOffset)
2212+
2213+
// Also simulate what the OLD approach would have done: record offsets
2214+
// by re-reading the WAL to build an offset map.
2215+
rd2, err := NewWALReader(walFile, slog.Default())
2216+
if err != nil {
2217+
t.Fatal(err)
2218+
}
2219+
oldOffsetMap := make(map[uint32]int64)
2220+
oldData := make([]byte, pageSize)
2221+
for {
2222+
pgno, fcommit, err := rd2.ReadFrame(context.Background(), oldData)
2223+
if errors.Is(err, io.EOF) {
2224+
break
2225+
} else if err != nil {
2226+
t.Fatal(err)
2227+
}
2228+
oldOffsetMap[pgno] = rd2.Offset()
2229+
_ = fcommit
2230+
}
2231+
t.Logf("old offset map: %d page offsets recorded", len(oldOffsetMap))
2232+
2233+
// Snapshot buffered data before we corrupt the WAL.
2234+
savedPages := make(map[uint32][]byte, len(pageMap))
2235+
for pgno, data := range pageMap {
2236+
buf := make([]byte, len(data))
2237+
copy(buf, data)
2238+
savedPages[pgno] = buf
2239+
}
2240+
2241+
// --- Simulate a concurrent checkpoint overwriting WAL frames ---
2242+
walFileRW, err := os.OpenFile(dbPath+"-wal", os.O_RDWR, 0)
2243+
if err != nil {
2244+
t.Fatal(err)
2245+
}
2246+
defer walFileRW.Close()
2247+
2248+
garbage := bytes.Repeat([]byte("CHECKPOINT-OVERWROTE-THIS-FRAME!"), 256)
2249+
frameSize := int64(WALFrameHeaderSize) + int64(pageSize)
2250+
corruptedFrames := 0
2251+
for offset := int64(WALHeaderSize); offset < maxOffset+frameSize; offset += frameSize {
2252+
if _, err := walFileRW.WriteAt(garbage[:frameSize], offset); err != nil {
2253+
break
2254+
}
2255+
corruptedFrames++
2256+
}
2257+
t.Logf("corrupted %d WAL frames to simulate checkpoint", corruptedFrames)
2258+
2259+
// --- Verify OLD approach fails: re-reading from offsets gets garbage ---
2260+
oldCorrupted := 0
2261+
readBuf := make([]byte, pageSize)
2262+
for pgno, offset := range oldOffsetMap {
2263+
if _, err := walFile.ReadAt(readBuf, offset+WALFrameHeaderSize); err != nil {
2264+
t.Logf("old approach: read error for page %d at offset %d: %v", pgno, offset, err)
2265+
oldCorrupted++
2266+
continue
2267+
}
2268+
if original, ok := savedPages[pgno]; ok {
2269+
if !bytes.Equal(readBuf, original) {
2270+
oldCorrupted++
2271+
}
2272+
}
2273+
}
2274+
t.Logf("OLD offset-based approach: %d/%d pages corrupted by checkpoint simulation", oldCorrupted, len(oldOffsetMap))
2275+
if oldCorrupted == 0 {
2276+
t.Fatal("expected old offset-based approach to read corrupted data after WAL overwrite")
2277+
}
2278+
2279+
// --- Verify NEW approach succeeds: buffered data is intact ---
2280+
newCorrupted := 0
2281+
for pgno, data := range pageMap {
2282+
if !bytes.Equal(data, savedPages[pgno]) {
2283+
newCorrupted++
2284+
t.Errorf("page %d: buffered data was corrupted by WAL modification", pgno)
2285+
}
2286+
}
2287+
t.Logf("NEW buffered approach: %d/%d pages corrupted", newCorrupted, len(pageMap))
2288+
if newCorrupted > 0 {
2289+
t.Fatalf("buffered PageMap data should survive WAL modification, but %d pages were corrupted", newCorrupted)
2290+
}
2291+
2292+
// --- Verify buffered data produces a valid incremental LTX file ---
2293+
// This is the writeLTXFromWAL path (incremental sync, not snapshot).
2294+
var ltxBuf bytes.Buffer
2295+
enc, err := ltx.NewEncoder(&ltxBuf)
2296+
if err != nil {
2297+
t.Fatal(err)
2298+
}
2299+
// Use MinTXID=2 to indicate an incremental sync (not a snapshot).
2300+
// Snapshot transactions (MinTXID=1) require sequential page coverage.
2301+
if err := enc.EncodeHeader(ltx.Header{
2302+
Version: ltx.Version,
2303+
Flags: ltx.HeaderFlagNoChecksum,
2304+
PageSize: pageSize,
2305+
Commit: commit,
2306+
MinTXID: 2,
2307+
MaxTXID: 2,
2308+
}); err != nil {
2309+
t.Fatal(err)
2310+
}
2311+
2312+
pgnos := make([]uint32, 0, len(pageMap))
2313+
for pgno := range pageMap {
2314+
pgnos = append(pgnos, pgno)
2315+
}
2316+
slices.Sort(pgnos)
2317+
2318+
for _, pgno := range pgnos {
2319+
if err := enc.EncodePage(ltx.PageHeader{Pgno: pgno}, pageMap[pgno]); err != nil {
2320+
t.Fatalf("encode page %d: %v", pgno, err)
2321+
}
2322+
}
2323+
2324+
if err := enc.Close(); err != nil {
2325+
t.Fatal(err)
2326+
}
2327+
2328+
dec := ltx.NewDecoder(&ltxBuf)
2329+
if err := dec.DecodeHeader(); err != nil {
2330+
t.Fatal(err)
2331+
}
2332+
if dec.Header().Commit != commit {
2333+
t.Fatalf("LTX commit=%d, want %d", dec.Header().Commit, commit)
2334+
}
2335+
2336+
var pageCount int
2337+
for {
2338+
var phdr ltx.PageHeader
2339+
data := make([]byte, pageSize)
2340+
if err := dec.DecodePage(&phdr, data); err == io.EOF {
2341+
break
2342+
} else if err != nil {
2343+
t.Fatalf("decode page: %v", err)
2344+
}
2345+
pageCount++
2346+
}
2347+
2348+
if pageCount != len(pageMap) {
2349+
t.Fatalf("decoded %d pages, want %d", pageCount, len(pageMap))
2350+
}
2351+
t.Logf("LTX file valid: %d pages encoded and decoded successfully", pageCount)
2352+
}

0 commit comments

Comments
 (0)