Skip to content

Commit 9be0eb1

Browse files
benbjohnsonclaude
andauthored
feat(vfs): support creating new databases in write mode (#972)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent ad5c462 commit 9be0eb1

File tree

3 files changed

+265
-1
lines changed

3 files changed

+265
-1
lines changed

.claude/commands/run-comprehensive-tests.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ echo "=== Testing Builds ==="
142142
go build -o bin/litestream ./cmd/litestream
143143

144144
# Test VFS build (requires CGO)
145-
CGO_ENABLED=1 go build -tags vfs -o bin/litestream-vfs ./cmd/litestream-vfs
145+
make vfs
146146

147147
# Test cross-compilation
148148
GOOS=linux GOARCH=amd64 go build -o bin/litestream-linux-amd64 ./cmd/litestream

vfs.go

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
const (
3131
DefaultPollInterval = 1 * time.Second
3232
DefaultCacheSize = 10 * 1024 * 1024 // 10MB
33+
DefaultPageSize = 4096 // SQLite default page size
3334

3435
pageFetchRetryAttempts = 6
3536
pageFetchRetryDelay = 15 * time.Millisecond
@@ -545,8 +546,15 @@ func (f *VFSFile) hasTargetTime() bool {
545546
func (f *VFSFile) Open() error {
546547
f.logger.Debug("opening file")
547548

549+
// Try to get restore plan. For write-enabled VFS, we can create a new database
550+
// if no LTX files exist yet.
548551
infos, err := f.waitForRestorePlan()
549552
if err != nil {
553+
// If write mode is enabled and no files exist, we can create a new database
554+
if f.writeEnabled && errors.Is(err, ErrTxNotAvailable) {
555+
f.logger.Info("no existing database found, creating new database")
556+
return f.openNewDatabase()
557+
}
550558
return err
551559
}
552560

@@ -607,6 +615,55 @@ func (f *VFSFile) Open() error {
607615
return nil
608616
}
609617

618+
// openNewDatabase initializes the VFSFile for a brand new database with no existing data.
619+
// This is called when write mode is enabled and no LTX files exist yet.
620+
func (f *VFSFile) openNewDatabase() error {
621+
f.logger.Debug("initializing new database")
622+
623+
// Use default page size for new databases
624+
f.pageSize = DefaultPageSize
625+
626+
// Initialize page cache
627+
cacheEntries := f.CacheSize / int(f.pageSize)
628+
if cacheEntries < 1 {
629+
cacheEntries = 1
630+
}
631+
cache, err := lru.New[uint32, []byte](cacheEntries)
632+
if err != nil {
633+
return fmt.Errorf("create page cache: %w", err)
634+
}
635+
f.cache = cache
636+
637+
// Initialize empty index - no pages exist yet
638+
f.index = make(map[uint32]ltx.PageIndexElem)
639+
f.pending = make(map[uint32]ltx.PageIndexElem)
640+
f.pos = ltx.Pos{TXID: 0}
641+
f.commit = 0
642+
643+
// Initialize write support for new database
644+
f.expectedTXID = 0
645+
f.pendingTXID = 1
646+
f.logger.Debug("write support enabled for new database", "expectedTXID", f.expectedTXID, "pendingTXID", f.pendingTXID)
647+
648+
// Initialize write buffer file for durability
649+
if err := f.initWriteBuffer(); err != nil {
650+
f.logger.Warn("failed to initialize write buffer", "error", err)
651+
}
652+
653+
// Start monitoring for new LTX files (in case another writer creates the database)
654+
f.wg.Add(1)
655+
go func() { defer f.wg.Done(); f.monitorReplicaClient(f.ctx) }()
656+
657+
// Start periodic sync goroutine
658+
if f.syncInterval > 0 {
659+
f.syncTicker = time.NewTicker(f.syncInterval)
660+
f.wg.Add(1)
661+
go func() { defer f.wg.Done(); f.syncLoop() }()
662+
}
663+
664+
return nil
665+
}
666+
610667
// SetTargetTime rebuilds the page index to view the database at a specific time.
611668
func (f *VFSFile) SetTargetTime(ctx context.Context, timestamp time.Time) error {
612669
if timestamp.IsZero() {
@@ -804,6 +861,15 @@ func (f *VFSFile) ReadAt(p []byte, off int64) (n int, err error) {
804861
f.mu.Unlock()
805862

806863
if !ok {
864+
// For write-enabled VFS with a new database (no existing pages),
865+
// return zeros to indicate empty page. SQLite will initialize the database.
866+
if f.writeEnabled {
867+
f.logger.Debug("page not found, returning zeros for new database", "page", pgno)
868+
for i := range p {
869+
p[i] = 0
870+
}
871+
return len(p), nil
872+
}
807873
f.logger.Error("page not found", "page", pgno)
808874
return 0, fmt.Errorf("page not found: %d", pgno)
809875
}
@@ -1733,6 +1799,17 @@ func isSupportedPageSize(pageSize uint32) bool {
17331799
}
17341800

17351801
func (f *VFSFile) waitForRestorePlan() ([]*ltx.FileInfo, error) {
1802+
// If write mode is enabled, don't wait - return immediately so we can
1803+
// create a new database if no files exist.
1804+
if f.writeEnabled {
1805+
infos, err := CalcRestorePlan(f.ctx, f.client, 0, time.Time{}, f.logger)
1806+
if err != nil {
1807+
return nil, err
1808+
}
1809+
return infos, nil
1810+
}
1811+
1812+
// For read-only mode, wait for files to become available
17361813
for {
17371814
infos, err := CalcRestorePlan(f.ctx, f.client, 0, time.Time{}, f.logger)
17381815
if err == nil {

vfs_write_test.go

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,3 +638,190 @@ func TestVFSFile_WriteBufferClearAfterSync(t *testing.T) {
638638
t.Errorf("buffer should be empty after sync, got size %d", stat.Size())
639639
}
640640
}
641+
642+
func TestVFSFile_OpenNewDatabase(t *testing.T) {
643+
// Test opening a VFSFile with write mode enabled when no LTX files exist (new database)
644+
client := newWriteTestReplicaClient()
645+
// Note: No LTX files created - simulating a brand new database
646+
647+
// Create temp directory for buffer
648+
tmpDir := t.TempDir()
649+
bufferPath := tmpDir + "/.litestream-write-buffer"
650+
651+
// Create VFSFile with write support - no existing data
652+
logger := slog.Default()
653+
f := NewVFSFile(client, "new.db", logger)
654+
f.writeEnabled = true
655+
f.dirty = make(map[uint32]int64)
656+
f.syncInterval = 0
657+
f.bufferPath = bufferPath
658+
659+
if err := f.Open(); err != nil {
660+
t.Fatal(err)
661+
}
662+
defer f.Close()
663+
664+
// Verify it opened successfully as a new database
665+
if f.pageSize != DefaultPageSize {
666+
t.Errorf("expected page size %d, got %d", DefaultPageSize, f.pageSize)
667+
}
668+
669+
if f.pos.TXID != 0 {
670+
t.Errorf("expected TXID 0 for new database, got %d", f.pos.TXID)
671+
}
672+
673+
if f.expectedTXID != 0 {
674+
t.Errorf("expected expectedTXID 0, got %d", f.expectedTXID)
675+
}
676+
677+
if f.pendingTXID != 1 {
678+
t.Errorf("expected pendingTXID 1, got %d", f.pendingTXID)
679+
}
680+
681+
if f.commit != 0 {
682+
t.Errorf("expected commit 0 for new database, got %d", f.commit)
683+
}
684+
}
685+
686+
func TestVFSFile_NewDatabase_ReadReturnsZeros(t *testing.T) {
687+
// Test that reading from a new database returns zeros
688+
client := newWriteTestReplicaClient()
689+
690+
tmpDir := t.TempDir()
691+
bufferPath := tmpDir + "/.litestream-write-buffer"
692+
693+
logger := slog.Default()
694+
f := NewVFSFile(client, "new.db", logger)
695+
f.writeEnabled = true
696+
f.dirty = make(map[uint32]int64)
697+
f.syncInterval = 0
698+
f.bufferPath = bufferPath
699+
700+
if err := f.Open(); err != nil {
701+
t.Fatal(err)
702+
}
703+
defer f.Close()
704+
705+
// Read page 1 - should return zeros for new database
706+
readBuf := make([]byte, 100)
707+
n, err := f.ReadAt(readBuf, 0)
708+
if err != nil {
709+
t.Fatalf("expected no error reading from new database, got: %v", err)
710+
}
711+
if n != len(readBuf) {
712+
t.Errorf("expected %d bytes, got %d", len(readBuf), n)
713+
}
714+
715+
// Verify all zeros
716+
for i, b := range readBuf {
717+
if b != 0 {
718+
t.Errorf("expected zero at position %d, got %d", i, b)
719+
break
720+
}
721+
}
722+
}
723+
724+
func TestVFSFile_NewDatabase_WriteAndSync(t *testing.T) {
725+
// Test writing to a new database and syncing to remote
726+
client := newWriteTestReplicaClient()
727+
728+
tmpDir := t.TempDir()
729+
bufferPath := tmpDir + "/.litestream-write-buffer"
730+
731+
logger := slog.Default()
732+
f := NewVFSFile(client, "new.db", logger)
733+
f.writeEnabled = true
734+
f.dirty = make(map[uint32]int64)
735+
f.syncInterval = 0
736+
f.bufferPath = bufferPath
737+
738+
if err := f.Open(); err != nil {
739+
t.Fatal(err)
740+
}
741+
defer f.Close()
742+
743+
// Write data to page 1
744+
writeData := []byte("new database content")
745+
n, err := f.WriteAt(writeData, 0)
746+
if err != nil {
747+
t.Fatal(err)
748+
}
749+
if n != len(writeData) {
750+
t.Errorf("expected %d bytes written, got %d", len(writeData), n)
751+
}
752+
753+
// Verify dirty page exists
754+
if len(f.dirty) != 1 {
755+
t.Errorf("expected 1 dirty page, got %d", len(f.dirty))
756+
}
757+
758+
// Sync to remote
759+
if err := f.Sync(0); err != nil {
760+
t.Fatal(err)
761+
}
762+
763+
// Verify TXID advanced
764+
if f.expectedTXID != 1 {
765+
t.Errorf("expected expectedTXID 1 after sync, got %d", f.expectedTXID)
766+
}
767+
if f.pendingTXID != 2 {
768+
t.Errorf("expected pendingTXID 2 after sync, got %d", f.pendingTXID)
769+
}
770+
771+
// Verify LTX file was written
772+
client.mu.Lock()
773+
if len(client.ltxFiles[0]) != 1 {
774+
t.Errorf("expected 1 LTX file after sync, got %d", len(client.ltxFiles[0]))
775+
}
776+
if len(client.ltxFiles[0]) > 0 {
777+
info := client.ltxFiles[0][0]
778+
if info.MinTXID != 1 || info.MaxTXID != 1 {
779+
t.Errorf("expected TXID 1, got min=%d max=%d", info.MinTXID, info.MaxTXID)
780+
}
781+
}
782+
client.mu.Unlock()
783+
}
784+
785+
func TestVFSFile_NewDatabase_FileSize(t *testing.T) {
786+
// Test that FileSize returns 0 for a new empty database
787+
client := newWriteTestReplicaClient()
788+
789+
tmpDir := t.TempDir()
790+
bufferPath := tmpDir + "/.litestream-write-buffer"
791+
792+
logger := slog.Default()
793+
f := NewVFSFile(client, "new.db", logger)
794+
f.writeEnabled = true
795+
f.dirty = make(map[uint32]int64)
796+
f.syncInterval = 0
797+
f.bufferPath = bufferPath
798+
799+
if err := f.Open(); err != nil {
800+
t.Fatal(err)
801+
}
802+
defer f.Close()
803+
804+
// FileSize should be 0 for empty database
805+
size, err := f.FileSize()
806+
if err != nil {
807+
t.Fatal(err)
808+
}
809+
if size != 0 {
810+
t.Errorf("expected size 0 for new database, got %d", size)
811+
}
812+
813+
// Write a page
814+
data := make([]byte, DefaultPageSize)
815+
if _, err := f.WriteAt(data, 0); err != nil {
816+
t.Fatal(err)
817+
}
818+
819+
// FileSize should now reflect the dirty page
820+
size, err = f.FileSize()
821+
if err != nil {
822+
t.Fatal(err)
823+
}
824+
if size != int64(DefaultPageSize) {
825+
t.Errorf("expected size %d after write, got %d", DefaultPageSize, size)
826+
}
827+
}

0 commit comments

Comments
 (0)