-
Notifications
You must be signed in to change notification settings - Fork 347
Expand file tree
/
Copy pathvfs.go
More file actions
3149 lines (2712 loc) · 84 KB
/
vfs.go
File metadata and controls
3149 lines (2712 loc) · 84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//go:build vfs
// +build vfs
package litestream
import (
"context"
"crypto/rand"
"errors"
"fmt"
"hash/fnv"
"io"
"log/slog"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
_ "unsafe"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/markusmobius/go-dateparser"
"github.com/psanford/sqlite3vfs"
"github.com/superfly/ltx"
)
const (
DefaultPollInterval = 1 * time.Second
DefaultCacheSize = 10 * 1024 * 1024 // 10MB
DefaultPageSize = 4096 // SQLite default page size
pageFetchRetryAttempts = 6
pageFetchRetryDelay = 15 * time.Millisecond
)
// ErrConflict is returned when the remote replica has newer transactions than expected.
var ErrConflict = errors.New("remote has newer transactions than expected")
var (
//go:linkname sqlite3vfsFileMap github.com/psanford/sqlite3vfs.fileMap
sqlite3vfsFileMap map[uint64]sqlite3vfs.File
//go:linkname sqlite3vfsFileMux github.com/psanford/sqlite3vfs.fileMux
sqlite3vfsFileMux sync.Mutex
vfsConnectionMap sync.Map // map[uintptr]uint64
)
// VFS implements the SQLite VFS interface for Litestream.
// It is intended to be used for read replicas that read directly from S3.
// When WriteEnabled is true, also supports writes with periodic sync.
type VFS struct {
client ReplicaClient
logger *slog.Logger
// PollInterval is the interval at which to poll the replica client for new
// LTX files. The index will be fetched for the new files automatically.
PollInterval time.Duration
// CacheSize is the maximum size of the page cache in bytes.
CacheSize int
// WriteEnabled activates write support for the VFS.
WriteEnabled bool
// WriteSyncInterval is how often to sync dirty pages to remote storage.
// If zero, defaults to DefaultSyncInterval (1 second).
WriteSyncInterval time.Duration
// WriteBufferPath is the path for local write buffer persistence.
// If empty, uses a temp file.
WriteBufferPath string
// HydrationEnabled activates background hydration of the database to a local file.
// When enabled, the VFS will restore the database in the background and serve
// reads from the local file once complete, eliminating remote fetch latency.
HydrationEnabled bool
// HydrationPath is the file path for local hydration file.
// If empty and HydrationEnabled is true, a temp file will be used.
HydrationPath string
// CompactionEnabled activates background compaction for the VFS.
// Requires WriteEnabled to be true.
CompactionEnabled bool
// CompactionLevels defines the compaction intervals for each level.
// If nil, uses default compaction levels.
CompactionLevels CompactionLevels
// SnapshotInterval is how often to create full database snapshots.
// Set to 0 to disable automatic snapshots.
SnapshotInterval time.Duration
// SnapshotRetention is how long to keep old snapshots.
// Set to 0 to keep all snapshots.
SnapshotRetention time.Duration
// L0Retention is how long to keep L0 files after compaction into L1.
// Set to 0 to delete immediately after compaction.
L0Retention time.Duration
writeMu sync.Mutex
writeFile *VFSFile // current RESERVED lock holder (nil if none)
lastSyncedTXID ltx.TXID // highest TXID synced by any local connection
writeSeq uint64 // atomic counter for unique buffer paths
tempDirOnce sync.Once
tempDir string
tempDirErr error
tempFiles sync.Map // canonical name -> absolute path
tempNames sync.Map // canonical name -> struct{}{}
}
func NewVFS(client ReplicaClient, logger *slog.Logger) *VFS {
return &VFS{
client: client,
logger: logger.With("vfs", "true"),
PollInterval: DefaultPollInterval,
CacheSize: DefaultCacheSize,
}
}
func (vfs *VFS) Open(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.File, sqlite3vfs.OpenFlag, error) {
slog.Debug("opening file", "name", name, "flags", flags)
switch {
case flags&sqlite3vfs.OpenMainDB != 0:
return vfs.openMainDB(name, flags)
case vfs.requiresTempFile(flags):
return vfs.openTempFile(name, flags)
default:
return nil, flags, sqlite3vfs.CantOpenError
}
}
func (vfs *VFS) openMainDB(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.File, sqlite3vfs.OpenFlag, error) {
cfg := GetVFSConfig(name)
client := vfs.client
var perConnClient bool
if cfg != nil && cfg.ReplicaURL != "" {
var err error
client, err = NewReplicaClientFromURL(cfg.ReplicaURL)
if err != nil {
return nil, 0, fmt.Errorf("create per-connection replica client: %w", err)
}
if err := client.Init(context.Background()); err != nil {
return nil, 0, fmt.Errorf("init per-connection replica client: %w", err)
}
perConnClient = true
}
if client == nil {
return nil, 0, fmt.Errorf("no replica client configured: set LITESTREAM_REPLICA_URL or use SetVFSConfig")
}
f := NewVFSFile(client, name, vfs.logger.With("name", name))
f.PollInterval = vfs.PollInterval
f.CacheSize = vfs.CacheSize
f.vfs = vfs
f.perConnClient = perConnClient
if cfg != nil {
if cfg.PollInterval != nil {
f.PollInterval = *cfg.PollInterval
}
if cfg.CacheSize != nil {
f.CacheSize = *cfg.CacheSize
}
}
writeEnabled := vfs.WriteEnabled
if cfg != nil && cfg.WriteEnabled != nil {
writeEnabled = *cfg.WriteEnabled
}
syncInterval := vfs.WriteSyncInterval
if cfg != nil && cfg.SyncInterval != nil {
syncInterval = *cfg.SyncInterval
}
bufferPath := vfs.WriteBufferPath
if cfg != nil && cfg.BufferPath != "" {
bufferPath = cfg.BufferPath
}
hydrationEnabled := vfs.HydrationEnabled
if cfg != nil && cfg.HydrationEnabled != nil {
hydrationEnabled = *cfg.HydrationEnabled
}
hydrationPath := vfs.HydrationPath
if cfg != nil && cfg.HydrationPath != "" {
hydrationPath = cfg.HydrationPath
}
// Initialize write support if enabled
if writeEnabled {
f.writeEnabled = true
f.dirty = make(map[uint32]int64)
f.syncInterval = syncInterval
if f.syncInterval == 0 {
f.syncInterval = DefaultSyncInterval
}
writeSeq := atomic.AddUint64(&vfs.writeSeq, 1)
if bufferPath != "" {
if writeSeq == 1 {
f.bufferPath = bufferPath
} else {
f.bufferPath = bufferPath + "." + strconv.FormatUint(writeSeq, 10)
}
} else {
dir, err := vfs.ensureTempDir()
if err != nil {
return nil, 0, fmt.Errorf("create temp dir for write buffer: %w", err)
}
f.bufferPath = filepath.Join(dir, "write-buffer-"+strconv.FormatUint(writeSeq, 10))
}
// Initialize compaction if enabled
if vfs.CompactionEnabled {
f.compactor = NewCompactor(client, f.logger)
}
}
// Initialize hydration support if enabled
if hydrationEnabled {
if hydrationPath != "" {
f.hydrationPath = hydrationPath
f.hydrationPersistent = true
} else {
dir, err := vfs.ensureTempDir()
if err != nil {
return nil, 0, fmt.Errorf("create temp dir for hydration: %w", err)
}
f.hydrationPath = filepath.Join(dir, "hydration.db")
}
}
if err := f.Open(); err != nil {
if perConnClient {
if closer, ok := client.(io.Closer); ok {
closer.Close()
}
}
return nil, 0, err
}
if writeEnabled {
vfs.writeMu.Lock()
if f.expectedTXID > vfs.lastSyncedTXID {
vfs.lastSyncedTXID = f.expectedTXID
}
vfs.writeMu.Unlock()
}
// When SQLite requests read-write access, always report ReadWrite in the
// output flags so that cold enable via PRAGMA litestream_write_enabled
// works. SQLite permanently marks databases as read-only based on the
// output flags from xOpen (pager.c:readOnly, btree.c:BTS_READ_ONLY),
// which would prevent write transactions even after enabling writes at
// runtime. Read-only enforcement happens at the VFS layer (WriteAt,
// Truncate, Lock) when writeEnabled is false.
//
// If the caller explicitly requested read-only, we respect that intent.
if flags&sqlite3vfs.OpenReadOnly == 0 {
flags &^= sqlite3vfs.OpenReadOnly
flags |= sqlite3vfs.OpenReadWrite
}
return f, flags, nil
}
func (vfs *VFS) Delete(name string, dirSync bool) error {
slog.Debug("deleting file", "name", name, "dirSync", dirSync)
err := vfs.deleteTempFile(name)
if err == nil {
return nil
}
if errors.Is(err, os.ErrNotExist) {
return nil
}
if errors.Is(err, errTempFileNotFound) {
return fmt.Errorf("cannot delete vfs file")
}
return err
}
func (vfs *VFS) Access(name string, flag sqlite3vfs.AccessFlag) (bool, error) {
slog.Debug("accessing file", "name", name, "flag", flag)
if strings.HasSuffix(name, "-wal") {
return vfs.accessWAL(name, flag)
}
if vfs.isTempFileName(name) {
return vfs.accessTempFile(name, flag)
}
return false, nil
}
func (vfs *VFS) accessWAL(name string, flag sqlite3vfs.AccessFlag) (bool, error) {
return false, nil
}
func (vfs *VFS) FullPathname(name string) string {
slog.Debug("full pathname", "name", name)
return name
}
func (vfs *VFS) requiresTempFile(flags sqlite3vfs.OpenFlag) bool {
const tempMask = sqlite3vfs.OpenTempDB |
sqlite3vfs.OpenTempJournal |
sqlite3vfs.OpenSubJournal |
sqlite3vfs.OpenSuperJournal |
sqlite3vfs.OpenTransientDB |
sqlite3vfs.OpenMainJournal
if flags&tempMask != 0 {
return true
}
return flags&sqlite3vfs.OpenDeleteOnClose != 0
}
func (vfs *VFS) ensureTempDir() (string, error) {
vfs.tempDirOnce.Do(func() {
dir, err := os.MkdirTemp("", "litestream-vfs-*")
if err != nil {
vfs.tempDirErr = fmt.Errorf("create temp dir: %w", err)
return
}
vfs.tempDir = dir
})
return vfs.tempDir, vfs.tempDirErr
}
func (vfs *VFS) canonicalTempName(name string) string {
if name == "" {
return ""
}
name = filepath.Clean(name)
if name == "." || name == string(filepath.Separator) {
return ""
}
return name
}
func tempFilenameFromCanonical(canonical string) (string, error) {
base := filepath.Base(canonical)
if base == "." || base == string(filepath.Separator) {
return "", fmt.Errorf("invalid temp file name: %q", canonical)
}
h := fnv.New64a()
if _, err := h.Write([]byte(canonical)); err != nil {
return "", fmt.Errorf("hash temp name: %w", err)
}
return fmt.Sprintf("%s-%016x", base, h.Sum64()), nil
}
func (vfs *VFS) openTempFile(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.File, sqlite3vfs.OpenFlag, error) {
dir, err := vfs.ensureTempDir()
if err != nil {
return nil, flags, err
}
deleteOnClose := flags&sqlite3vfs.OpenDeleteOnClose != 0 || name == ""
var f *os.File
var onClose func()
if name == "" {
f, err = os.CreateTemp(dir, "temp-*")
if err != nil {
return nil, flags, sqlite3vfs.CantOpenError
}
} else {
canonical := vfs.canonicalTempName(name)
if canonical == "" {
return nil, flags, sqlite3vfs.CantOpenError
}
fname, err := tempFilenameFromCanonical(canonical)
if err != nil {
return nil, flags, sqlite3vfs.CantOpenError
}
path := filepath.Join(dir, fname)
flag := openFlagToOSFlag(flags)
if flag == 0 {
flag = os.O_RDWR
}
f, err = os.OpenFile(path, flag|os.O_CREATE, 0o600)
if err != nil {
return nil, flags, sqlite3vfs.CantOpenError
}
onClose = vfs.trackTempFile(canonical, path)
}
return newLocalTempFile(f, deleteOnClose, onClose), flags, nil
}
func (vfs *VFS) deleteTempFile(name string) error {
path, ok := vfs.loadTempFilePath(name)
if !ok {
if vfs.wasTempFileName(name) {
vfs.unregisterTempFile(name)
return os.ErrNotExist
}
return errTempFileNotFound
}
if err := os.Remove(path); err != nil {
if !os.IsNotExist(err) {
return err
}
}
vfs.unregisterTempFile(name)
return nil
}
func (vfs *VFS) isTempFileName(name string) bool {
_, ok := vfs.loadTempFilePath(name)
return ok
}
func (vfs *VFS) wasTempFileName(name string) bool {
canonical := vfs.canonicalTempName(name)
if canonical == "" {
return false
}
_, ok := vfs.tempNames.Load(canonical)
return ok
}
func (vfs *VFS) unregisterTempFile(name string) {
canonical := vfs.canonicalTempName(name)
if canonical == "" {
return
}
vfs.tempFiles.Delete(canonical)
}
func (vfs *VFS) accessTempFile(name string, flag sqlite3vfs.AccessFlag) (bool, error) {
path, ok := vfs.loadTempFilePath(name)
if !ok {
return false, nil
}
_, err := os.Stat(path)
if err != nil {
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
return true, nil
}
func (vfs *VFS) trackTempFile(canonical, path string) func() {
if canonical == "" {
return func() {}
}
vfs.tempFiles.Store(canonical, path)
vfs.tempNames.Store(canonical, struct{}{})
return func() { vfs.tempFiles.Delete(canonical) }
}
func (vfs *VFS) loadTempFilePath(name string) (string, bool) {
canonical := vfs.canonicalTempName(name)
if canonical == "" {
return "", false
}
if path, ok := vfs.tempFiles.Load(canonical); ok {
return path.(string), true
}
return "", false
}
func openFlagToOSFlag(flag sqlite3vfs.OpenFlag) int {
var v int
if flag&sqlite3vfs.OpenReadWrite != 0 {
v |= os.O_RDWR
} else if flag&sqlite3vfs.OpenReadOnly != 0 {
v |= os.O_RDONLY
}
if flag&sqlite3vfs.OpenCreate != 0 {
v |= os.O_CREATE
}
if flag&sqlite3vfs.OpenExclusive != 0 {
v |= os.O_EXCL
}
return v
}
var errTempFileNotFound = fmt.Errorf("temp file not tracked")
// localTempFile fulfills sqlite3vfs.File solely for SQLite temp & transient files.
// These files stay on the local filesystem and optionally delete themselves
// when SQLite closes them (DeleteOnClose flag).
type localTempFile struct {
f *os.File
deleteOnClose bool
lockType atomic.Int32
onClose func()
}
func newLocalTempFile(f *os.File, deleteOnClose bool, onClose func()) *localTempFile {
return &localTempFile{f: f, deleteOnClose: deleteOnClose, onClose: onClose}
}
func (tf *localTempFile) Close() error {
err := tf.f.Close()
if tf.deleteOnClose {
if removeErr := os.Remove(tf.f.Name()); removeErr != nil && !os.IsNotExist(removeErr) && err == nil {
err = removeErr
}
}
if tf.onClose != nil {
tf.onClose()
}
return err
}
func (tf *localTempFile) ReadAt(p []byte, off int64) (n int, err error) {
return tf.f.ReadAt(p, off)
}
func (tf *localTempFile) WriteAt(b []byte, off int64) (n int, err error) {
return tf.f.WriteAt(b, off)
}
func (tf *localTempFile) Truncate(size int64) error {
return tf.f.Truncate(size)
}
func (tf *localTempFile) Sync(flag sqlite3vfs.SyncType) error {
return tf.f.Sync()
}
func (tf *localTempFile) FileSize() (int64, error) {
info, err := tf.f.Stat()
if err != nil {
return 0, err
}
return info.Size(), nil
}
func (tf *localTempFile) Lock(elock sqlite3vfs.LockType) error {
if elock == sqlite3vfs.LockNone {
return nil
}
tf.lockType.Store(int32(elock))
return nil
}
func (tf *localTempFile) Unlock(elock sqlite3vfs.LockType) error {
tf.lockType.Store(int32(elock))
return nil
}
func (tf *localTempFile) CheckReservedLock() (bool, error) {
return sqlite3vfs.LockType(tf.lockType.Load()) >= sqlite3vfs.LockReserved, nil
}
func (tf *localTempFile) SectorSize() int64 {
return 0
}
func (tf *localTempFile) DeviceCharacteristics() sqlite3vfs.DeviceCharacteristic {
return 0
}
// VFSFile implements the SQLite VFS file interface.
type VFSFile struct {
mu sync.Mutex
client ReplicaClient
name string
pos ltx.Pos // Last TXID read from level 0 or 1
maxTXID1 ltx.TXID // Last TXID read from level 1
index map[uint32]ltx.PageIndexElem
pending map[uint32]ltx.PageIndexElem
pendingReplace bool
cache *lru.Cache[uint32, []byte] // LRU cache for page data
targetTime *time.Time // Target view time; nil means latest
latestLTXTime time.Time // Timestamp of most recent LTX file
lastPollSuccess time.Time // Time of last successful poll
lockType sqlite3vfs.LockType // Current lock state
pageSize uint32
commit uint32
// Write support fields (only used when writeEnabled is true)
writeEnabled bool // Whether write support is enabled
dirty map[uint32]int64 // Dirty pages: pgno -> offset in buffer file
pendingTXID ltx.TXID // Next TXID to use for sync
expectedTXID ltx.TXID // Expected remote TXID (for conflict detection)
bufferFile *os.File // Temp file for durability
bufferPath string // Path to buffer file
bufferNextOff int64 // Next write offset in buffer file
syncTicker *time.Ticker // Ticker for periodic sync
syncInterval time.Duration // Interval for periodic sync
syncStop chan struct{} // Signal to stop sync loop
inTransaction bool // True during active write transaction
disabling bool // True when write disable is in progress
cond *sync.Cond // Signals transaction state changes
perConnClient bool // True when client was created from config registry (close on file close)
hydrator *Hydrator // Background hydration (nil if disabled)
hydrationPath string // Path for hydration file (set during Open)
hydrationPersistent bool // True when using user-specified persistent path
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
logger *slog.Logger
PollInterval time.Duration
CacheSize int
// Compaction support (only used when VFS.CompactionEnabled is true)
vfs *VFS // Reference back to parent VFS for config
compactor *Compactor // Shared compaction logic
compactionWg sync.WaitGroup
compactionCtx context.Context
compactionCancel context.CancelFunc
}
// Hydrator handles background hydration of the database to a local file.
type Hydrator struct {
path string // Full path to hydration file
persistent bool // True when file should survive across restarts
file *os.File // Local database file
complete atomic.Bool // True when restore completes
txid ltx.TXID // TXID the hydrated file is at
mu sync.Mutex // Protects hydration file writes
err error // Stores fatal hydration error
compactor *ltx.Compactor // Tracks compaction progress during restore
pageSize uint32 // Page size of the database
client ReplicaClient
logger *slog.Logger
}
// NewHydrator creates a new Hydrator instance.
func NewHydrator(path string, persistent bool, pageSize uint32, client ReplicaClient, logger *slog.Logger) *Hydrator {
return &Hydrator{
path: path,
persistent: persistent,
pageSize: pageSize,
client: client,
logger: logger,
}
}
// Init opens or creates the hydration file.
func (h *Hydrator) Init() error {
if err := os.MkdirAll(filepath.Dir(h.path), 0755); err != nil {
return fmt.Errorf("create hydration directory: %w", err)
}
if h.persistent {
if txid, err := h.loadMeta(); err == nil {
if _, statErr := os.Stat(h.path); statErr == nil {
file, err := os.OpenFile(h.path, os.O_RDWR, 0600)
if err != nil {
return fmt.Errorf("open persistent hydration file: %w", err)
}
h.file = file
h.txid = txid
return nil
}
}
if err := os.Remove(h.metaPath()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove stale hydration meta: %w", err)
}
}
file, err := os.OpenFile(h.path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return fmt.Errorf("create hydration file: %w", err)
}
h.file = file
return nil
}
// Complete returns true if hydration has completed.
func (h *Hydrator) Complete() bool {
return h.complete.Load()
}
// SetComplete marks hydration as complete.
func (h *Hydrator) SetComplete() {
h.complete.Store(true)
}
// Disable temporarily disables hydrated reads (used during time travel).
func (h *Hydrator) Disable() {
h.complete.Store(false)
}
// TXID returns the current hydration TXID.
func (h *Hydrator) TXID() ltx.TXID {
h.mu.Lock()
defer h.mu.Unlock()
return h.txid
}
// SetTXID sets the hydration TXID.
func (h *Hydrator) SetTXID(txid ltx.TXID) {
h.mu.Lock()
defer h.mu.Unlock()
h.txid = txid
}
// Err returns any fatal hydration error.
func (h *Hydrator) Err() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.err
}
// SetErr sets a fatal hydration error.
func (h *Hydrator) SetErr(err error) {
h.mu.Lock()
defer h.mu.Unlock()
h.err = err
}
// Status returns the current compaction progress during restore.
func (h *Hydrator) Status() ltx.CompactorStatus {
if h.compactor == nil {
return ltx.CompactorStatus{}
}
return h.compactor.Status()
}
// Restore restores the database from LTX files to the hydration file.
func (h *Hydrator) Restore(ctx context.Context, infos []*ltx.FileInfo) error {
// Open all LTX files as readers
rdrs := make([]io.Reader, 0, len(infos))
defer func() {
for _, rd := range rdrs {
if closer, ok := rd.(io.Closer); ok {
_ = closer.Close()
}
}
}()
for _, info := range infos {
h.logger.Debug("opening ltx file for hydration", "level", info.Level, "min", info.MinTXID, "max", info.MaxTXID)
rc, err := h.client.OpenLTXFile(ctx, info.Level, info.MinTXID, info.MaxTXID, 0, 0)
if err != nil {
return fmt.Errorf("open ltx file: %w", err)
}
rdrs = append(rdrs, rc)
}
if len(rdrs) == 0 {
return fmt.Errorf("no ltx files for hydration")
}
// Compact and decode using io.Pipe pattern
pr, pw := io.Pipe()
c, err := ltx.NewCompactor(pw, rdrs)
if err != nil {
return fmt.Errorf("new ltx compactor: %w", err)
}
c.HeaderFlags = ltx.HeaderFlagNoChecksum
h.compactor = c
go func() {
_ = pw.CloseWithError(c.Compact(ctx))
}()
h.mu.Lock()
defer h.mu.Unlock()
dec := ltx.NewDecoder(pr)
if err := dec.DecodeDatabaseTo(h.file); err != nil {
return fmt.Errorf("decode database: %w", err)
}
h.txid = infos[len(infos)-1].MaxTXID
return nil
}
// CatchUp applies updates from LTX files between fromTXID and toTXID.
func (h *Hydrator) CatchUp(ctx context.Context, fromTXID, toTXID ltx.TXID) error {
h.logger.Debug("catching up hydration", "from", fromTXID, "to", toTXID)
// Fetch LTX files from fromTXID+1 to toTXID
itr, err := h.client.LTXFiles(ctx, 0, fromTXID+1, false)
if err != nil {
return fmt.Errorf("list ltx files for catch-up: %w", err)
}
defer itr.Close()
for itr.Next() {
info := itr.Item()
if info.MaxTXID > toTXID {
break
}
if err := h.ApplyLTX(ctx, info); err != nil {
return fmt.Errorf("apply ltx to hydrated file: %w", err)
}
h.mu.Lock()
h.txid = info.MaxTXID
h.mu.Unlock()
}
return nil
}
// ApplyLTX fetches an entire LTX file and applies its pages to the hydration file.
func (h *Hydrator) ApplyLTX(ctx context.Context, info *ltx.FileInfo) error {
h.logger.Debug("applying ltx to hydration file", "level", info.Level, "min", info.MinTXID, "max", info.MaxTXID)
// Fetch entire LTX file
rc, err := h.client.OpenLTXFile(ctx, info.Level, info.MinTXID, info.MaxTXID, 0, 0)
if err != nil {
return fmt.Errorf("open ltx file: %w", err)
}
defer rc.Close()
dec := ltx.NewDecoder(rc)
if err := dec.DecodeHeader(); err != nil {
return fmt.Errorf("decode header: %w", err)
}
h.mu.Lock()
defer h.mu.Unlock()
// Apply each page to the hydration file
for {
var phdr ltx.PageHeader
data := make([]byte, h.pageSize)
if err := dec.DecodePage(&phdr, data); err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("decode page: %w", err)
}
off := int64(phdr.Pgno-1) * int64(h.pageSize)
if _, err := h.file.WriteAt(data, off); err != nil {
return fmt.Errorf("write page %d: %w", phdr.Pgno, err)
}
}
return nil
}
// ReadAt reads data from the hydrated local file.
func (h *Hydrator) ReadAt(p []byte, off int64) (int, error) {
h.mu.Lock()
n, err := h.file.ReadAt(p, off)
h.mu.Unlock()
if err != nil && err != io.EOF {
return n, fmt.Errorf("read hydrated file: %w", err)
}
// Update the first page to pretend like we are in journal mode
if off == 0 && len(p) >= 28 {
p[18], p[19] = 0x01, 0x01
_, _ = rand.Read(p[24:28])
}
return n, nil
}
// ApplyUpdates fetches updated pages and writes them to the hydration file.
func (h *Hydrator) ApplyUpdates(ctx context.Context, updates map[uint32]ltx.PageIndexElem) error {
h.mu.Lock()
defer h.mu.Unlock()
for pgno, elem := range updates {
_, data, err := FetchPage(ctx, h.client, elem.Level, elem.MinTXID, elem.MaxTXID, elem.Offset, elem.Size)
if err != nil {
return fmt.Errorf("fetch updated page %d: %w", pgno, err)
}
off := int64(pgno-1) * int64(h.pageSize)
if _, err := h.file.WriteAt(data, off); err != nil {
return fmt.Errorf("write updated page %d: %w", pgno, err)
}
}
return nil
}
// WritePage writes a single page to the hydration file.
func (h *Hydrator) WritePage(pgno uint32, data []byte) error {
h.mu.Lock()
defer h.mu.Unlock()
off := int64(pgno-1) * int64(h.pageSize)
if _, err := h.file.WriteAt(data, off); err != nil {
return fmt.Errorf("write page %d to hydrated file: %w", pgno, err)
}
return nil
}
// Truncate truncates the hydration file to the specified size.
func (h *Hydrator) Truncate(size int64) error {
h.mu.Lock()
defer h.mu.Unlock()
return h.file.Truncate(size)
}
// Close closes the hydration file. For persistent hydrators, the file and a
// companion .meta file are preserved so hydration can resume on the next open.
func (h *Hydrator) Close() error {
if h.file == nil {
return nil
}
if h.persistent && h.txid > 0 {
if err := h.file.Sync(); err != nil {
h.logger.Warn("failed to sync hydration file", "error", err)
}
if err := h.saveMeta(); err != nil {
h.logger.Warn("failed to save hydration meta", "error", err)
}
return h.file.Close()
}
if err := h.file.Close(); err != nil {
return err
}
if err := os.Remove(h.path); err != nil && !os.IsNotExist(err) {
return err
}
if err := os.Remove(h.metaPath()); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}
func (h *Hydrator) metaPath() string {
return h.path + ".meta"
}
func (h *Hydrator) loadMeta() (ltx.TXID, error) {
data, err := os.ReadFile(h.metaPath())
if err != nil {
return 0, err
}
v, err := strconv.ParseUint(strings.TrimSpace(string(data)), 10, 64)
if err != nil {
return 0, fmt.Errorf("parse hydration meta: %w", err)
}
return ltx.TXID(v), nil
}
func (h *Hydrator) saveMeta() error {
h.mu.Lock()
txid := h.txid
h.mu.Unlock()
dir := filepath.Dir(h.metaPath())
tmp, err := os.CreateTemp(dir, ".hydration-meta-*")
if err != nil {
return fmt.Errorf("create temp meta file: %w", err)
}
tmpPath := tmp.Name()
if _, err := fmt.Fprintf(tmp, "%d\n", txid); err != nil {
if closeErr := tmp.Close(); closeErr != nil {
h.logger.Warn("failed to close temp meta file during cleanup", "error", closeErr)
}
if removeErr := os.Remove(tmpPath); removeErr != nil {
h.logger.Warn("failed to remove temp meta file during cleanup", "error", removeErr)
}
return fmt.Errorf("write hydration meta: %w", err)
}
if err := tmp.Sync(); err != nil {
if closeErr := tmp.Close(); closeErr != nil {
h.logger.Warn("failed to close temp meta file during cleanup", "error", closeErr)
}
if removeErr := os.Remove(tmpPath); removeErr != nil {
h.logger.Warn("failed to remove temp meta file during cleanup", "error", removeErr)
}
return fmt.Errorf("sync temp meta file: %w", err)
}
if err := tmp.Close(); err != nil {
if removeErr := os.Remove(tmpPath); removeErr != nil {
h.logger.Warn("failed to remove temp meta file during cleanup", "error", removeErr)
}
return fmt.Errorf("close temp meta file: %w", err)
}
if err := os.Rename(tmpPath, h.metaPath()); err != nil {
if removeErr := os.Remove(tmpPath); removeErr != nil {
h.logger.Warn("failed to remove temp meta file during cleanup", "error", removeErr)
}
return fmt.Errorf("rename hydration meta: %w", err)
}
if err := syncDir(filepath.Dir(h.metaPath())); err != nil {
return fmt.Errorf("sync hydration meta directory: %w", err)
}