Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 62 additions & 12 deletions cmd/litestream-vfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"io"
"log/slog"
"os"
"strconv"
"strings"
"time"
"unsafe"
Expand All @@ -41,21 +42,18 @@ func main() {}
//export LitestreamVFSRegister
func LitestreamVFSRegister() *C.char {
var client litestream.ReplicaClient
var err error

replicaURL := os.Getenv("LITESTREAM_REPLICA_URL")
if replicaURL == "" {
return C.CString("LITESTREAM_REPLICA_URL environment variable required")
}

client, err = litestream.NewReplicaClientFromURL(replicaURL)
if err != nil {
return C.CString(fmt.Sprintf("failed to create replica client: %s", err))
}
if replicaURL != "" {
var err error
client, err = litestream.NewReplicaClientFromURL(replicaURL)
if err != nil {
return C.CString(fmt.Sprintf("failed to create replica client: %s", err))
}

// Initialize the client.
if err := client.Init(context.Background()); err != nil {
return C.CString(fmt.Sprintf("failed to initialize replica client: %s", err))
if err := client.Init(context.Background()); err != nil {
return C.CString(fmt.Sprintf("failed to initialize replica client: %s", err))
}
}

var level slog.Level
Expand Down Expand Up @@ -111,6 +109,58 @@ func LitestreamVFSRegister() *C.char {
return nil
}

//export GoLitestreamConfigure
func GoLitestreamConfigure(dbName *C.char, key *C.char, value *C.char) *C.char {
name := C.GoString(dbName)
k := C.GoString(key)
v := C.GoString(value)

cfg := litestream.GetVFSConfig(name)
if cfg == nil {
cfg = &litestream.VFSConfig{}
}

switch k {
case "replica_url":
cfg.ReplicaURL = v
case "write_enabled":
b := strings.ToLower(v) == "true" || v == "1"
cfg.WriteEnabled = &b
case "sync_interval":
d, err := time.ParseDuration(v)
if err != nil {
return C.CString(fmt.Sprintf("invalid sync_interval: %s", err))
}
cfg.SyncInterval = &d
case "buffer_path":
cfg.BufferPath = v
case "hydration_enabled":
b := strings.ToLower(v) == "true" || v == "1"
cfg.HydrationEnabled = &b
case "hydration_path":
cfg.HydrationPath = v
case "poll_interval":
d, err := time.ParseDuration(v)
if err != nil {
return C.CString(fmt.Sprintf("invalid poll_interval: %s", err))
}
cfg.PollInterval = &d
case "cache_size":
n, err := strconv.Atoi(v)
if err != nil {
return C.CString(fmt.Sprintf("invalid cache_size: %s", err))
}
cfg.CacheSize = &n
case "log_level":
cfg.LogLevel = v
default:
return C.CString(fmt.Sprintf("unknown config key: %s", k))
}

litestream.SetVFSConfig(name, cfg)
return nil
}

//export GoLitestreamRegisterConnection
func GoLitestreamRegisterConnection(dbPtr unsafe.Pointer, fileID C.sqlite3_uint64) *C.char {
if err := litestream.RegisterVFSConnection(uintptr(dbPtr), uint64(fileID)); err != nil {
Expand Down
96 changes: 82 additions & 14 deletions vfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,81 @@ func (vfs *VFS) Open(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.File, s
}

func (vfs *VFS) openMainDB(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.File, sqlite3vfs.OpenFlag, error) {
f := NewVFSFile(vfs.client, name, vfs.logger.With("name", name))
cfg := GetVFSConfig(name)

client := vfs.client
var perConnClient bool
if cfg != nil && cfg.ReplicaURL != "" {
var err error
client, err = NewReplicaClientFromURL(cfg.ReplicaURL)
if err != nil {
return nil, 0, fmt.Errorf("create per-connection replica client: %w", err)
}
if err := client.Init(context.Background()); err != nil {
return nil, 0, fmt.Errorf("init per-connection replica client: %w", err)
}
perConnClient = true
}

if client == nil {
return nil, 0, fmt.Errorf("no replica client configured: set LITESTREAM_REPLICA_URL or use SetVFSConfig")
}

f := NewVFSFile(client, name, vfs.logger.With("name", name))
f.PollInterval = vfs.PollInterval
f.CacheSize = vfs.CacheSize
f.vfs = vfs // Store reference to parent VFS for config access
f.vfs = vfs
f.perConnClient = perConnClient

if cfg != nil {
if cfg.PollInterval != nil {
f.PollInterval = *cfg.PollInterval
}
if cfg.CacheSize != nil {
f.CacheSize = *cfg.CacheSize
}
}

writeEnabled := vfs.WriteEnabled
if cfg != nil && cfg.WriteEnabled != nil {
writeEnabled = *cfg.WriteEnabled
}

syncInterval := vfs.WriteSyncInterval
if cfg != nil && cfg.SyncInterval != nil {
syncInterval = *cfg.SyncInterval
}

bufferPath := vfs.WriteBufferPath
if cfg != nil && cfg.BufferPath != "" {
bufferPath = cfg.BufferPath
}

hydrationEnabled := vfs.HydrationEnabled
if cfg != nil && cfg.HydrationEnabled != nil {
hydrationEnabled = *cfg.HydrationEnabled
}

hydrationPath := vfs.HydrationPath
if cfg != nil && cfg.HydrationPath != "" {
hydrationPath = cfg.HydrationPath
}

// Initialize write support if enabled
if vfs.WriteEnabled {
if writeEnabled {
f.writeEnabled = true
f.dirty = make(map[uint32]int64)
f.syncInterval = vfs.WriteSyncInterval
f.syncInterval = syncInterval
if f.syncInterval == 0 {
f.syncInterval = DefaultSyncInterval
}

writeSeq := atomic.AddUint64(&vfs.writeSeq, 1)
if vfs.WriteBufferPath != "" {
if bufferPath != "" {
if writeSeq == 1 {
f.bufferPath = vfs.WriteBufferPath
f.bufferPath = bufferPath
} else {
f.bufferPath = vfs.WriteBufferPath + "." + strconv.FormatUint(writeSeq, 10)
f.bufferPath = bufferPath + "." + strconv.FormatUint(writeSeq, 10)
}
} else {
dir, err := vfs.ensureTempDir()
Expand All @@ -169,18 +224,16 @@ func (vfs *VFS) openMainDB(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.F

// Initialize compaction if enabled
if vfs.CompactionEnabled {
f.compactor = NewCompactor(vfs.client, f.logger)
// VFS has no local files, so leave LocalFileOpener/LocalFileDeleter nil
f.compactor = NewCompactor(client, f.logger)
}
}

// Initialize hydration support if enabled
if vfs.HydrationEnabled {
if vfs.HydrationPath != "" {
f.hydrationPath = vfs.HydrationPath
if hydrationEnabled {
if hydrationPath != "" {
f.hydrationPath = hydrationPath
f.hydrationPersistent = true
} else {
// Use a temp file if no path specified
dir, err := vfs.ensureTempDir()
if err != nil {
return nil, 0, fmt.Errorf("create temp dir for hydration: %w", err)
Expand All @@ -190,10 +243,15 @@ func (vfs *VFS) openMainDB(name string, flags sqlite3vfs.OpenFlag) (sqlite3vfs.F
}

if err := f.Open(); err != nil {
if perConnClient {
if closer, ok := client.(io.Closer); ok {
closer.Close()
}
}
return nil, 0, err
}

if vfs.WriteEnabled {
if writeEnabled {
vfs.writeMu.Lock()
if f.expectedTXID > vfs.lastSyncedTXID {
vfs.lastSyncedTXID = f.expectedTXID
Expand Down Expand Up @@ -543,6 +601,8 @@ type VFSFile struct {
disabling bool // True when write disable is in progress
cond *sync.Cond // Signals transaction state changes

perConnClient bool // True when client was created from config registry (close on file close)

hydrator *Hydrator // Background hydration (nil if disabled)
hydrationPath string // Path for hydration file (set during Open)
hydrationPersistent bool // True when using user-specified persistent path
Expand Down Expand Up @@ -1416,6 +1476,14 @@ func (f *VFSFile) Close() error {
f.vfs.writeMu.Unlock()
}

if f.perConnClient {
if closer, ok := f.client.(io.Closer); ok {
if err := closer.Close(); err != nil {
f.logger.Warn("failed to close per-connection client", "error", err)
}
}
}

return nil
}

Expand Down
50 changes: 50 additions & 0 deletions vfs_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//go:build vfs
// +build vfs

package litestream

import (
"sync"
"time"
)

type VFSConfig struct {
ReplicaURL string
WriteEnabled *bool
SyncInterval *time.Duration
BufferPath string
HydrationEnabled *bool
HydrationPath string
PollInterval *time.Duration
CacheSize *int
LogLevel string
}

var (
vfsConfigs = make(map[string]*VFSConfig)
vfsConfigsMu sync.RWMutex
)

func SetVFSConfig(dbName string, cfg *VFSConfig) {
vfsConfigsMu.Lock()
defer vfsConfigsMu.Unlock()
copied := *cfg
vfsConfigs[dbName] = &copied
}

func GetVFSConfig(dbName string) *VFSConfig {
vfsConfigsMu.RLock()
defer vfsConfigsMu.RUnlock()
orig := vfsConfigs[dbName]
if orig == nil {
return nil
}
copied := *orig
return &copied
}

func DeleteVFSConfig(dbName string) {
vfsConfigsMu.Lock()
defer vfsConfigsMu.Unlock()
delete(vfsConfigs, dbName)
}
Loading
Loading