diff --git a/abs/replica_client.go b/abs/replica_client.go index cbe82685a..65822a55c 100644 --- a/abs/replica_client.go +++ b/abs/replica_client.go @@ -66,6 +66,10 @@ func NewReplicaClient() *ReplicaClient { } } +func (c *ReplicaClient) SetLogger(logger *slog.Logger) { + c.logger = logger.WithGroup(ReplicaClientType) +} + // NewReplicaClientFromURL creates a new ReplicaClient from URL components. // This is used by the replica client factory registration. // URL format: abs://[account-name@]container/path diff --git a/cmd/litestream/main.go b/cmd/litestream/main.go index daef54fc9..f283953f7 100644 --- a/cmd/litestream/main.go +++ b/cmd/litestream/main.go @@ -1269,6 +1269,8 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re return nil, fmt.Errorf("unknown replica type in config: %q", c.Type) } + r.Client.SetLogger(r.Logger()) + return r, nil } diff --git a/compactor.go b/compactor.go index 2e5d37bd3..23c1c8859 100644 --- a/compactor.go +++ b/compactor.go @@ -63,6 +63,10 @@ func NewCompactor(client ReplicaClient, logger *slog.Logger) *Compactor { } } +func (c *Compactor) setLogger(logger *slog.Logger) { + c.logger = logger +} + // MaxLTXFileInfo returns metadata for the last LTX file in a level. // Uses cache if available, otherwise fetches from remote. func (c *Compactor) MaxLTXFileInfo(ctx context.Context, level int) (ltx.FileInfo, error) { diff --git a/db.go b/db.go index ed88a4505..ec5ed5479 100644 --- a/db.go +++ b/db.go @@ -208,7 +208,7 @@ func NewDB(path string) *DB { RetentionEnabled: true, ShutdownSyncTimeout: DefaultShutdownSyncTimeout, ShutdownSyncInterval: DefaultShutdownSyncInterval, - Logger: slog.With("db", filepath.Base(path)), + Logger: slog.With(LogKeyDB, filepath.Base(path)), } db.maxLTXFileInfos.m = make(map[int]*ltx.FileInfo) @@ -226,7 +226,7 @@ func NewDB(path string) *DB { db.ctx, db.cancel = context.WithCancel(context.Background()) // Initialize compactor with nil client (set once in Open() from Replica.Client). - db.compactor = NewCompactor(nil, db.Logger) + db.compactor = NewCompactor(nil, db.Logger.With(LogKeySubsystem, LogSubsystemCompactor)) db.compactor.LocalFileOpener = db.openLocalLTXFile db.compactor.LocalFileDeleter = db.deleteLocalLTXFile db.compactor.CompactionVerifyErrorCounter = compactionVerifyErrorCounterVec.WithLabelValues(db.path) @@ -245,6 +245,20 @@ func NewDB(path string) *DB { return db } +// SetLogger updates the database logger and propagates to subsystems. +func (db *DB) SetLogger(logger *slog.Logger) { + if logger == nil { + logger = slog.Default() + } + db.Logger = logger + if db.compactor != nil { + db.compactor.setLogger(logger.With(LogKeySubsystem, LogSubsystemCompactor)) + } + if db.Replica != nil && db.Replica.Client != nil { + db.Replica.Client.SetLogger(db.Replica.Logger()) + } +} + // SQLDB returns a reference to the underlying sql.DB connection. func (db *DB) SQLDB() *sql.DB { return db.db @@ -1326,7 +1340,7 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) { // to avoid underflow (32 - 4120 = -4088). // See: https://github.com/benbjohnson/litestream/issues/900 if info.offset == WALHeaderSize { - slog.Debug("verify", "saltMatch", saltMatch, "atWALHeader", true) + db.Logger.Debug("verify", "saltMatch", saltMatch, "atWALHeader", true) if saltMatch { info.snapshotting = false return info, nil @@ -1337,7 +1351,7 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) { // If offset is at the beginning of the first page, we can't check for previous page. prevWALOffset := info.offset - frameSize - slog.Debug("verify", "saltMatch", saltMatch, "prevWALOffset", prevWALOffset) + db.Logger.Debug("verify", "saltMatch", saltMatch, "prevWALOffset", prevWALOffset) if prevWALOffset == WALHeaderSize { if saltMatch { // No writes occurred since last sync, salt still matches @@ -1360,7 +1374,7 @@ func (db *DB) verify(ctx context.Context) (info syncInfo, err error) { return info, nil } - slog.Debug("verify.2", "lastPageMatch", lastPageMatch) + db.Logger.Debug("verify.2", "lastPageMatch", lastPageMatch) // Salt has changed which could indicate a FULL checkpoint. // If we have a last page match, then we can assume that the WAL has not been overwritten. @@ -1441,7 +1455,7 @@ func (db *DB) detectFullCheckpoint(ctx context.Context, knownSalts [][2]uint32) lastKnownSalt = knownSalts[len(knownSalts)-1] } - rd, err := NewWALReader(walFile, db.Logger) + rd, err := NewWALReader(walFile, db.Logger.With(LogKeySubsystem, LogSubsystemWALReader)) if err != nil { return false, fmt.Errorf("new wal reader: %w", err) } @@ -1513,18 +1527,19 @@ func (db *DB) sync(ctx context.Context, checkpointing bool, info syncInfo) (sync } defer walFile.Close() + walReaderLogger := db.Logger.With(LogKeySubsystem, LogSubsystemWALReader) var rd *WALReader if info.offset == WALHeaderSize { - if rd, err = NewWALReader(walFile, db.Logger); err != nil { + if rd, err = NewWALReader(walFile, walReaderLogger); err != nil { return false, fmt.Errorf("new wal reader: %w", err) } } else { // If we cannot verify the previous frame var pfmError *PrevFrameMismatchError - if rd, err = NewWALReaderWithOffset(ctx, walFile, info.offset, info.salt1, info.salt2, db.Logger); errors.As(err, &pfmError) { + if rd, err = NewWALReaderWithOffset(ctx, walFile, info.offset, info.salt1, info.salt2, walReaderLogger); errors.As(err, &pfmError) { db.Logger.Log(ctx, internal.LevelTrace, "prev frame mismatch, snapshotting", "err", pfmError.Err) info.offset = WALHeaderSize - if rd, err = NewWALReader(walFile, db.Logger); err != nil { + if rd, err = NewWALReader(walFile, walReaderLogger); err != nil { return false, fmt.Errorf("new wal reader, after reset") } } else if err != nil { @@ -1899,7 +1914,7 @@ func (db *DB) SnapshotReader(ctx context.Context) (ltx.Pos, io.Reader, error) { } defer walFile.Close() - rd, err := NewWALReader(walFile, db.Logger) + rd, err := NewWALReader(walFile, db.Logger.With(LogKeySubsystem, LogSubsystemWALReader)) if err != nil { pw.CloseWithError(fmt.Errorf("new wal reader: %w", err)) return diff --git a/db_internal_test.go b/db_internal_test.go index f76bb62b0..240111ef8 100644 --- a/db_internal_test.go +++ b/db_internal_test.go @@ -28,6 +28,8 @@ type testReplicaClient struct { func (c *testReplicaClient) Init(_ context.Context) error { return nil } +func (c *testReplicaClient) SetLogger(_ *slog.Logger) {} + func (c *testReplicaClient) Type() string { return "test" } func (c *testReplicaClient) LTXFiles(_ context.Context, level int, afterTXID ltx.TXID, _ bool) (ltx.FileIterator, error) { diff --git a/file/replica_client.go b/file/replica_client.go index c76939c4b..dbaf76501 100644 --- a/file/replica_client.go +++ b/file/replica_client.go @@ -44,6 +44,10 @@ func NewReplicaClient(path string) *ReplicaClient { } } +func (c *ReplicaClient) SetLogger(logger *slog.Logger) { + c.logger = logger.WithGroup(ReplicaClientType) +} + // NewReplicaClientFromURL creates a new ReplicaClient from URL components. // This is used by the replica client factory registration. func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (litestream.ReplicaClient, error) { diff --git a/gs/replica_client.go b/gs/replica_client.go index 927e5d481..ab89d2d47 100644 --- a/gs/replica_client.go +++ b/gs/replica_client.go @@ -52,6 +52,10 @@ func NewReplicaClient() *ReplicaClient { } } +func (c *ReplicaClient) SetLogger(logger *slog.Logger) { + c.logger = logger.WithGroup(ReplicaClientType) +} + // NewReplicaClientFromURL creates a new ReplicaClient from URL components. // This is used by the replica client factory registration. func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (litestream.ReplicaClient, error) { diff --git a/log.go b/log.go new file mode 100644 index 000000000..f9dd2e866 --- /dev/null +++ b/log.go @@ -0,0 +1,17 @@ +package litestream + +const ( + LogKeySystem = "system" + LogKeySubsystem = "subsystem" + LogKeyDB = "db" +) + +const ( + LogSystemStore = "store" + LogSystemServer = "server" +) + +const ( + LogSubsystemCompactor = "compactor" + LogSubsystemWALReader = "wal-reader" +) diff --git a/mock/replica_client.go b/mock/replica_client.go index 6e610fbe4..fddf724bd 100644 --- a/mock/replica_client.go +++ b/mock/replica_client.go @@ -3,6 +3,7 @@ package mock import ( "context" "io" + "log/slog" "github.com/superfly/ltx" @@ -48,3 +49,5 @@ func (c *ReplicaClient) WriteLTXFile(ctx context.Context, level int, minTXID, ma func (c *ReplicaClient) DeleteLTXFiles(ctx context.Context, a []*ltx.FileInfo) error { return c.DeleteLTXFilesFunc(ctx, a) } + +func (c *ReplicaClient) SetLogger(_ *slog.Logger) {} diff --git a/nats/replica_client.go b/nats/replica_client.go index 5f181aa39..61583dbc7 100644 --- a/nats/replica_client.go +++ b/nats/replica_client.go @@ -89,6 +89,10 @@ func NewReplicaClient() *ReplicaClient { } } +func (c *ReplicaClient) SetLogger(logger *slog.Logger) { + c.logger = logger.WithGroup(ReplicaClientType) +} + // NewReplicaClientFromURL creates a new ReplicaClient from URL components. // This is used by the replica client factory registration. // URL format: nats://[user:pass@]host[:port]/bucket diff --git a/oss/replica_client.go b/oss/replica_client.go index daaff4a72..6216a3b01 100644 --- a/oss/replica_client.go +++ b/oss/replica_client.go @@ -82,6 +82,10 @@ func NewReplicaClient() *ReplicaClient { } } +func (c *ReplicaClient) SetLogger(logger *slog.Logger) { + c.logger = logger.WithGroup(ReplicaClientType) +} + // NewReplicaClientFromURL creates a new ReplicaClient from URL components. // This is used by the replica client factory registration. // URL format: oss://bucket[.oss-region.aliyuncs.com]/path diff --git a/replica_client.go b/replica_client.go index 770e3941e..e2c07b0a6 100644 --- a/replica_client.go +++ b/replica_client.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "log/slog" "github.com/superfly/ltx" ) @@ -44,6 +45,9 @@ type ReplicaClient interface { // DeleteAll deletes all files. DeleteAll(ctx context.Context) error + + // SetLogger sets the logger for the client. + SetLogger(logger *slog.Logger) } // FindLTXFiles returns a list of files that match filter. diff --git a/replica_internal_test.go b/replica_internal_test.go index ba9a0f208..57a1f7b6a 100644 --- a/replica_internal_test.go +++ b/replica_internal_test.go @@ -6,6 +6,7 @@ import ( "database/sql" "fmt" "io" + "log/slog" "os" "path/filepath" "testing" @@ -179,6 +180,8 @@ func (*followTestReplicaClient) Type() string { return "test" } func (*followTestReplicaClient) Init(context.Context) error { return nil } +func (*followTestReplicaClient) SetLogger(*slog.Logger) {} + func (c *followTestReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID, useMetadata bool) (ltx.FileIterator, error) { if c.LTXFilesFunc != nil { return c.LTXFilesFunc(ctx, level, seek, useMetadata) diff --git a/s3/leaser.go b/s3/leaser.go index 6894fd520..57ba65699 100644 --- a/s3/leaser.go +++ b/s3/leaser.go @@ -64,6 +64,10 @@ func NewLeaser() *Leaser { } } +func (l *Leaser) SetLogger(logger *slog.Logger) { + l.logger = logger.WithGroup("s3-leaser") +} + func (l *Leaser) Client() S3API { return l.s3 } diff --git a/s3/replica_client.go b/s3/replica_client.go index a18587cc9..bb96af02c 100644 --- a/s3/replica_client.go +++ b/s3/replica_client.go @@ -124,6 +124,10 @@ func NewReplicaClient() *ReplicaClient { } } +func (c *ReplicaClient) SetLogger(logger *slog.Logger) { + c.logger = logger.WithGroup(ReplicaClientType) +} + // NewReplicaClientFromURL creates a new ReplicaClient from URL components. // This is used by the replica client factory registration. func NewReplicaClientFromURL(scheme, host, urlPath string, query url.Values, userinfo *url.Userinfo) (litestream.ReplicaClient, error) { diff --git a/server.go b/server.go index 043e20cf3..be0f73711 100644 --- a/server.go +++ b/server.go @@ -68,7 +68,7 @@ func NewServer(store *Store) *Server { SocketPerms: 0600, ctx: ctx, cancel: cancel, - logger: slog.Default(), + logger: slog.Default().With(LogKeySystem, LogSystemServer), } mux := http.NewServeMux() diff --git a/sftp/replica_client.go b/sftp/replica_client.go index 12ccd9ad8..72dcf3eff 100644 --- a/sftp/replica_client.go +++ b/sftp/replica_client.go @@ -66,6 +66,10 @@ func NewReplicaClient() *ReplicaClient { } } +func (c *ReplicaClient) SetLogger(logger *slog.Logger) { + c.logger = logger.WithGroup(ReplicaClientType) +} + // NewReplicaClientFromURL creates a new ReplicaClient from URL components. // This is used by the replica client factory registration. // URL format: sftp://[user[:password]@]host[:port]/path diff --git a/store.go b/store.go index cab19ed68..fe5f958c6 100644 --- a/store.go +++ b/store.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "path/filepath" "slices" "sync" "time" @@ -148,10 +149,11 @@ func NewStore(dbs []*DB, levels CompactionLevels) *Store { ShutdownSyncTimeout: DefaultShutdownSyncTimeout, ShutdownSyncInterval: DefaultShutdownSyncInterval, HeartbeatCheckInterval: DefaultHeartbeatCheckInterval, - Logger: slog.Default(), + Logger: slog.Default().With(LogKeySystem, LogSystemStore), } for _, db := range dbs { + db.SetLogger(s.Logger.With(LogKeyDB, filepath.Base(db.Path()))) db.L0Retention = s.L0Retention db.ShutdownSyncTimeout = s.ShutdownSyncTimeout db.ShutdownSyncInterval = s.ShutdownSyncInterval @@ -278,6 +280,7 @@ func (s *Store) RegisterDB(db *DB) error { s.mu.Unlock() // Apply store-wide settings before opening the database. + db.SetLogger(s.Logger.With(LogKeyDB, filepath.Base(db.Path()))) db.L0Retention = s.L0Retention db.ShutdownSyncTimeout = s.ShutdownSyncTimeout db.ShutdownSyncInterval = s.ShutdownSyncInterval diff --git a/store_compaction_remote_test.go b/store_compaction_remote_test.go index dcdd13ffa..7ecde667a 100644 --- a/store_compaction_remote_test.go +++ b/store_compaction_remote_test.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "log/slog" "os" "path/filepath" "sync" @@ -128,6 +129,8 @@ func (c *delayedReplicaClient) Type() string { return "delayed" } func (c *delayedReplicaClient) Init(context.Context) error { return nil } +func (c *delayedReplicaClient) SetLogger(*slog.Logger) {} + func (c *delayedReplicaClient) key(level int, min, max ltx.TXID) string { return fmt.Sprintf("%d:%s:%s", level, min.String(), max.String()) } diff --git a/webdav/replica_client.go b/webdav/replica_client.go index 275400476..6b5bbcf8d 100644 --- a/webdav/replica_client.go +++ b/webdav/replica_client.go @@ -52,6 +52,10 @@ func NewReplicaClient() *ReplicaClient { } } +func (c *ReplicaClient) SetLogger(logger *slog.Logger) { + c.logger = logger.WithGroup(ReplicaClientType) +} + // NewReplicaClientFromURL creates a new ReplicaClient from URL components. // This is used by the replica client factory registration. // URL format: webdav://[user[:password]@]host[:port]/path or webdavs://... (for HTTPS)