Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions abs/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func NewReplicaClient() *ReplicaClient {
}
}

func (c *ReplicaClient) SetLogger(logger *slog.Logger) {
c.logger = logger.WithGroup(ReplicaClientType)
}

// NewReplicaClientFromURL creates a new ReplicaClient from URL components.
// This is used by the replica client factory registration.
// URL format: abs://[account-name@]container/path
Expand Down
2 changes: 2 additions & 0 deletions cmd/litestream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,8 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
}

r.Client.SetLogger(r.Logger())

return r, nil
}

Expand Down
4 changes: 4 additions & 0 deletions compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func NewCompactor(client ReplicaClient, logger *slog.Logger) *Compactor {
}
}

func (c *Compactor) setLogger(logger *slog.Logger) {
c.logger = logger
}

// MaxLTXFileInfo returns metadata for the last LTX file in a level.
// Uses cache if available, otherwise fetches from remote.
func (c *Compactor) MaxLTXFileInfo(ctx context.Context, level int) (ltx.FileInfo, error) {
Expand Down
35 changes: 25 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func NewDB(path string) *DB {
RetentionEnabled: true,
ShutdownSyncTimeout: DefaultShutdownSyncTimeout,
ShutdownSyncInterval: DefaultShutdownSyncInterval,
Logger: slog.With("db", filepath.Base(path)),
Logger: slog.With(LogKeyDB, filepath.Base(path)),
}
db.maxLTXFileInfos.m = make(map[int]*ltx.FileInfo)

Expand All @@ -226,7 +226,7 @@ func NewDB(path string) *DB {
db.ctx, db.cancel = context.WithCancel(context.Background())

// Initialize compactor with nil client (set once in Open() from Replica.Client).
db.compactor = NewCompactor(nil, db.Logger)
db.compactor = NewCompactor(nil, db.Logger.With(LogKeySubsystem, LogSubsystemCompactor))
db.compactor.LocalFileOpener = db.openLocalLTXFile
db.compactor.LocalFileDeleter = db.deleteLocalLTXFile
db.compactor.CompactionVerifyErrorCounter = compactionVerifyErrorCounterVec.WithLabelValues(db.path)
Expand All @@ -245,6 +245,20 @@ func NewDB(path string) *DB {
return db
}

// SetLogger updates the database logger and propagates to subsystems.
func (db *DB) SetLogger(logger *slog.Logger) {
if logger == nil {
logger = slog.Default()
}
db.Logger = logger
if db.compactor != nil {
db.compactor.setLogger(logger.With(LogKeySubsystem, LogSubsystemCompactor))
}
if db.Replica != nil && db.Replica.Client != nil {
db.Replica.Client.SetLogger(db.Replica.Logger())
}
}

// SQLDB returns a reference to the underlying sql.DB connection.
func (db *DB) SQLDB() *sql.DB {
return db.db
Expand Down Expand Up @@ -1326,7 +1340,7 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {
// to avoid underflow (32 - 4120 = -4088).
// See: https://github.com/benbjohnson/litestream/issues/900
if info.offset == WALHeaderSize {
slog.Debug("verify", "saltMatch", saltMatch, "atWALHeader", true)
db.Logger.Debug("verify", "saltMatch", saltMatch, "atWALHeader", true)
if saltMatch {
info.snapshotting = false
return info, nil
Expand All @@ -1337,7 +1351,7 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {

// If offset is at the beginning of the first page, we can't check for previous page.
prevWALOffset := info.offset - frameSize
slog.Debug("verify", "saltMatch", saltMatch, "prevWALOffset", prevWALOffset)
db.Logger.Debug("verify", "saltMatch", saltMatch, "prevWALOffset", prevWALOffset)

if prevWALOffset == WALHeaderSize {
if saltMatch { // No writes occurred since last sync, salt still matches
Expand All @@ -1360,7 +1374,7 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) {
return info, nil
}

slog.Debug("verify.2", "lastPageMatch", lastPageMatch)
db.Logger.Debug("verify.2", "lastPageMatch", lastPageMatch)

// Salt has changed which could indicate a FULL checkpoint.
// If we have a last page match, then we can assume that the WAL has not been overwritten.
Expand Down Expand Up @@ -1441,7 +1455,7 @@ func (db *DB) detectFullCheckpoint(ctx context.Context, knownSalts [][2]uint32)
lastKnownSalt = knownSalts[len(knownSalts)-1]
}

rd, err := NewWALReader(walFile, db.Logger)
rd, err := NewWALReader(walFile, db.Logger.With(LogKeySubsystem, LogSubsystemWALReader))
if err != nil {
return false, fmt.Errorf("new wal reader: %w", err)
}
Expand Down Expand Up @@ -1513,18 +1527,19 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) (sync
}
defer walFile.Close()

walReaderLogger := db.Logger.With(LogKeySubsystem, LogSubsystemWALReader)
var rd *WALReader
if info.offset == WALHeaderSize {
if rd, err = NewWALReader(walFile, db.Logger); err != nil {
if rd, err = NewWALReader(walFile, walReaderLogger); err != nil {
return false, fmt.Errorf("new wal reader: %w", err)
}
} else {
// If we cannot verify the previous frame
var pfmError *PrevFrameMismatchError
if rd, err = NewWALReaderWithOffset(ctx, walFile, info.offset, info.salt1, info.salt2, db.Logger); errors.As(err, &pfmError) {
if rd, err = NewWALReaderWithOffset(ctx, walFile, info.offset, info.salt1, info.salt2, walReaderLogger); errors.As(err, &pfmError) {
db.Logger.Log(ctx, internal.LevelTrace, "prev frame mismatch, snapshotting", "err", pfmError.Err)
info.offset = WALHeaderSize
if rd, err = NewWALReader(walFile, db.Logger); err != nil {
if rd, err = NewWALReader(walFile, walReaderLogger); err != nil {
return false, fmt.Errorf("new wal reader, after reset")
}
} else if err != nil {
Expand Down Expand Up @@ -1899,7 +1914,7 @@ func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error) {
}
defer walFile.Close()

rd, err := NewWALReader(walFile, db.Logger)
rd, err := NewWALReader(walFile, db.Logger.With(LogKeySubsystem, LogSubsystemWALReader))
if err != nil {
pw.CloseWithError(fmt.Errorf("new wal reader: %w", err))
return
Expand Down
2 changes: 2 additions & 0 deletions db_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type testReplicaClient struct {

func (c *testReplicaClient) Init(_ context.Context) error { return nil }

func (c *testReplicaClient) SetLogger(_ *slog.Logger) {}

func (c *testReplicaClient) Type() string { return "test" }

func (c *testReplicaClient) LTXFiles(_ context.Context, level int, afterTXID ltx.TXID, _ bool) (ltx.FileIterator, error) {
Expand Down
4 changes: 4 additions & 0 deletions file/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func NewReplicaClient(path string) *ReplicaClient {
}
}

func (c *ReplicaClient) SetLogger(logger *slog.Logger) {
c.logger = logger.WithGroup(ReplicaClientType)
}

// NewReplicaClientFromURL creates a new ReplicaClient from URL components.
// This is used by the replica client factory registration.
func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (litestream.ReplicaClient, error) {
Expand Down
4 changes: 4 additions & 0 deletions gs/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func NewReplicaClient() *ReplicaClient {
}
}

func (c *ReplicaClient) SetLogger(logger *slog.Logger) {
c.logger = logger.WithGroup(ReplicaClientType)
}

// NewReplicaClientFromURL creates a new ReplicaClient from URL components.
// This is used by the replica client factory registration.
func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (litestream.ReplicaClient, error) {
Expand Down
17 changes: 17 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package litestream

const (
LogKeySystem = "system"
LogKeySubsystem = "subsystem"
LogKeyDB = "db"
)

const (
LogSystemStore = "store"
LogSystemServer = "server"
)

const (
LogSubsystemCompactor = "compactor"
LogSubsystemWALReader = "wal-reader"
)
3 changes: 3 additions & 0 deletions mock/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package mock
import (
"context"
"io"
"log/slog"

"github.com/superfly/ltx"

Expand Down Expand Up @@ -48,3 +49,5 @@ func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, ma
func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error {
return c.DeleteLTXFilesFunc(ctx, a)
}

func (c *ReplicaClient) SetLogger(_ *slog.Logger) {}
4 changes: 4 additions & 0 deletions nats/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func NewReplicaClient() *ReplicaClient {
}
}

func (c *ReplicaClient) SetLogger(logger *slog.Logger) {
c.logger = logger.WithGroup(ReplicaClientType)
}

// NewReplicaClientFromURL creates a new ReplicaClient from URL components.
// This is used by the replica client factory registration.
// URL format: nats://[user:pass@]host[:port]/bucket
Expand Down
4 changes: 4 additions & 0 deletions oss/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func NewReplicaClient() *ReplicaClient {
}
}

func (c *ReplicaClient) SetLogger(logger *slog.Logger) {
c.logger = logger.WithGroup(ReplicaClientType)
}

// NewReplicaClientFromURL creates a new ReplicaClient from URL components.
// This is used by the replica client factory registration.
// URL format: oss://bucket[.oss-region.aliyuncs.com]/path
Expand Down
4 changes: 4 additions & 0 deletions replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"

"github.com/superfly/ltx"
)
Expand Down Expand Up @@ -44,6 +45,9 @@ type ReplicaClient interface {

// DeleteAll deletes all files.
DeleteAll(ctx context.Context) error

// SetLogger sets the logger for the client.
SetLogger(logger *slog.Logger)
}

// FindLTXFiles returns a list of files that match filter.
Expand Down
3 changes: 3 additions & 0 deletions replica_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"testing"
Expand Down Expand Up @@ -179,6 +180,8 @@ func (*followTestReplicaClient) Type() string { return "test" }

func (*followTestReplicaClient) Init(context.Context) error { return nil }

func (*followTestReplicaClient) SetLogger(*slog.Logger) {}

func (c *followTestReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error) {
if c.LTXFilesFunc != nil {
return c.LTXFilesFunc(ctx, level, seek, useMetadata)
Expand Down
4 changes: 4 additions & 0 deletions s3/leaser.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func NewLeaser() *Leaser {
}
}

func (l *Leaser) SetLogger(logger *slog.Logger) {
l.logger = logger.WithGroup("s3-leaser")
}

func (l *Leaser) Client() S3API {
return l.s3
}
Expand Down
4 changes: 4 additions & 0 deletions s3/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func NewReplicaClient() *ReplicaClient {
}
}

func (c *ReplicaClient) SetLogger(logger *slog.Logger) {
c.logger = logger.WithGroup(ReplicaClientType)
}

// NewReplicaClientFromURL creates a new ReplicaClient from URL components.
// This is used by the replica client factory registration.
func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (litestream.ReplicaClient, error) {
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewServer(store *Store) *Server {
SocketPerms: 0600,
ctx: ctx,
cancel: cancel,
logger: slog.Default(),
logger: slog.Default().With(LogKeySystem, LogSystemServer),
}

mux := http.NewServeMux()
Expand Down
4 changes: 4 additions & 0 deletions sftp/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ func NewReplicaClient() *ReplicaClient {
}
}

func (c *ReplicaClient) SetLogger(logger *slog.Logger) {
c.logger = logger.WithGroup(ReplicaClientType)
}

// NewReplicaClientFromURL creates a new ReplicaClient from URL components.
// This is used by the replica client factory registration.
// URL format: sftp://[user[:password]@]host[:port]/path
Expand Down
5 changes: 4 additions & 1 deletion store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log/slog"
"path/filepath"
"slices"
"sync"
"time"
Expand Down Expand Up @@ -148,10 +149,11 @@ func NewStore(dbs []*DB, levels CompactionLevels) *Store {
ShutdownSyncTimeout: DefaultShutdownSyncTimeout,
ShutdownSyncInterval: DefaultShutdownSyncInterval,
HeartbeatCheckInterval: DefaultHeartbeatCheckInterval,
Logger: slog.Default(),
Logger: slog.Default().With(LogKeySystem, LogSystemStore),
}

for _, db := range dbs {
db.SetLogger(s.Logger.With(LogKeyDB, filepath.Base(db.Path())))
db.L0Retention = s.L0Retention
db.ShutdownSyncTimeout = s.ShutdownSyncTimeout
db.ShutdownSyncInterval = s.ShutdownSyncInterval
Expand Down Expand Up @@ -278,6 +280,7 @@ func (s *Store) RegisterDB(db *DB) error {
s.mu.Unlock()

// Apply store-wide settings before opening the database.
db.SetLogger(s.Logger.With(LogKeyDB, filepath.Base(db.Path())))
db.L0Retention = s.L0Retention
db.ShutdownSyncTimeout = s.ShutdownSyncTimeout
db.ShutdownSyncInterval = s.ShutdownSyncInterval
Expand Down
3 changes: 3 additions & 0 deletions store_compaction_remote_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -128,6 +129,8 @@ func (c *delayedReplicaClient) Type() string { return "delayed" }

func (c *delayedReplicaClient) Init(context.Context) error { return nil }

func (c *delayedReplicaClient) SetLogger(*slog.Logger) {}

func (c *delayedReplicaClient) key(level int, min, max ltx.TXID) string {
return fmt.Sprintf("%d:%s:%s", level, min.String(), max.String())
}
Expand Down
4 changes: 4 additions & 0 deletions webdav/replica_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func NewReplicaClient() *ReplicaClient {
}
}

func (c *ReplicaClient) SetLogger(logger *slog.Logger) {
c.logger = logger.WithGroup(ReplicaClientType)
}

// NewReplicaClientFromURL creates a new ReplicaClient from URL components.
// This is used by the replica client factory registration.
// URL format: webdav://[user[:password]@]host[:port]/path or webdavs://... (for HTTPS)
Expand Down
Loading