Skip to content

Commit 741965e

Browse files
benbjohnsonclaude
andcommitted
perf(db): cache Pos() result to avoid repeated disk reads
Cache the replication position computed from the latest L0 LTX file using an atomic.Pointer. The cache is invalidated whenever L0 files are created, deleted, or the L0 directory is cleared (reset, retention, sync errors, replica fetch). On sync success, the cache is updated directly from the encoder without a disk read. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 5531fab commit 741965e

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

db.go

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"strconv"
1717
"strings"
1818
"sync"
19+
"sync/atomic"
1920
"time"
2021

2122
"github.com/prometheus/client_golang/prometheus"
@@ -101,6 +102,10 @@ type DB struct {
101102
m map[int]*ltx.FileInfo
102103
}
103104

105+
// Cached position from the latest L0 LTX file.
106+
// nil means cache is invalid; non-nil is the cached position.
107+
posCache atomic.Pointer[ltx.Pos]
108+
104109
fileInfo os.FileInfo // db info cached during init
105110
dirInfo os.FileInfo // parent dir info cached during init
106111

@@ -300,6 +305,8 @@ func (db *DB) ResetLocalState(ctx context.Context) error {
300305
db.maxLTXFileInfos.m = make(map[int]*ltx.FileInfo)
301306
db.maxLTXFileInfos.Unlock()
302307

308+
db.invalidatePosCache()
309+
303310
db.Logger.Info("local state reset complete, next sync will create fresh snapshot")
304311
return nil
305312
}
@@ -330,6 +337,9 @@ func (db *DB) deleteLocalLTXFile(level int, minTXID, maxTXID ltx.TXID) error {
330337
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
331338
return err
332339
}
340+
if level == 0 {
341+
db.invalidatePosCache()
342+
}
333343
return nil
334344
}
335345

@@ -364,7 +374,12 @@ func (db *DB) DirInfo() os.FileInfo {
364374
}
365375

366376
// Pos returns the current replication position of the database.
377+
// The result is cached and invalidated when L0 LTX files change.
367378
func (db *DB) Pos() (ltx.Pos, error) {
379+
if p := db.posCache.Load(); p != nil {
380+
return *p, nil
381+
}
382+
368383
minTXID, maxTXID, err := db.MaxLTX()
369384
if err != nil {
370385
return ltx.Pos{}, err
@@ -382,7 +397,18 @@ func (db *DB) Pos() (ltx.Pos, error) {
382397
if err := dec.Verify(); err != nil {
383398
return ltx.Pos{}, fmt.Errorf("ltx verification failed: %w", err)
384399
}
385-
return dec.PostApplyPos(), nil
400+
401+
pos := dec.PostApplyPos()
402+
db.posCache.Store(&pos)
403+
404+
return pos, nil
405+
}
406+
407+
// invalidatePosCache clears the cached position so the next call to Pos()
408+
// recomputes it from disk. Call this when L0 LTX files are deleted or
409+
// when the L0 directory is cleared.
410+
func (db *DB) invalidatePosCache() {
411+
db.posCache.Store(nil)
386412
}
387413

388414
// Notify returns a channel that closes when the shadow WAL changes.
@@ -1195,6 +1221,7 @@ func (db *DB) checkDatabaseBehindReplica(ctx context.Context) error {
11951221
if err := os.RemoveAll(l0Dir); err != nil && !os.IsNotExist(err) {
11961222
return fmt.Errorf("remove L0 directory: %w", err)
11971223
}
1224+
db.invalidatePosCache()
11981225
if err := internal.MkdirAll(l0Dir, db.dirInfo); err != nil {
11991226
return fmt.Errorf("recreate L0 directory: %w", err)
12001227
}
@@ -1235,6 +1262,7 @@ func (db *DB) checkDatabaseBehindReplica(ctx context.Context) error {
12351262
if err := os.Rename(tmpPath, localPath); err != nil {
12361263
return fmt.Errorf("rename L0 file: %w", err)
12371264
}
1265+
db.invalidatePosCache()
12381266

12391267
db.Logger.Info("fetched latest L0 file from replica",
12401268
"min_txid", minTXID,
@@ -1632,6 +1660,7 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) (sync
16321660
db.maxLTXFileInfos.Lock()
16331661
delete(db.maxLTXFileInfos.m, 0) // clear cache if in unknown state
16341662
db.maxLTXFileInfos.Unlock()
1663+
db.invalidatePosCache()
16351664
return false, fmt.Errorf("rename ltx file: %w", err)
16361665
}
16371666

@@ -1646,6 +1675,10 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) (sync
16461675
}
16471676
db.maxLTXFileInfos.Unlock()
16481677

1678+
// Update cached position from the encoder.
1679+
encPos := enc.PostApplyPos()
1680+
db.posCache.Store(&encPos)
1681+
16491682
// Track the logical end of WAL content for checkpoint decisions.
16501683
// This is the WALOffset + WALSize from the LTX we just created.
16511684
// Using this instead of file size prevents issue #997 where stale
@@ -2177,6 +2210,9 @@ func (db *DB) EnforceL0RetentionByTime(ctx context.Context) error {
21772210
db.Logger.Error("failed to remove local l0 file", "path", localPath, "error", err)
21782211
}
21792212
}
2213+
if len(deleted) > 0 {
2214+
db.invalidatePosCache()
2215+
}
21802216

21812217
db.Logger.Info("l0 retention enforced", "deleted_count", len(deleted), "max_l1_txid", maxL1TXID)
21822218

db_internal_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,9 @@ func TestDB_Verify_WALOffsetAtHeader(t *testing.T) {
656656
t.Fatal(err)
657657
}
658658

659+
// Invalidate cached position since we wrote an L0 file directly.
660+
db.invalidatePosCache()
661+
659662
// Now call verify - before the fix, this would fail with:
660663
// "prev WAL offset is less than the header size: -4088"
661664
info, err := db.verify(context.Background())
@@ -776,6 +779,9 @@ func TestDB_Verify_WALOffsetAtHeader_SaltMismatch(t *testing.T) {
776779
t.Fatal(err)
777780
}
778781

782+
// Invalidate cached position since we wrote an L0 file directly.
783+
db.invalidatePosCache()
784+
779785
// Call verify - should succeed but indicate snapshotting due to salt mismatch
780786
info, err := db.verify(context.Background())
781787
if err != nil {

0 commit comments

Comments
 (0)