Skip to content

Commit 1b302ba

Browse files
committed
Multi-level compaction (#650)
1 parent ec1a322 commit 1b302ba

20 files changed

+902
-106
lines changed

abs/replica_client.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ func (c *ReplicaClient) Init(ctx context.Context) (err error) {
9191
}
9292

9393
// LTXFiles returns an iterator over all available LTX files.
94-
func (c *ReplicaClient) LTXFiles(ctx context.Context, level int) (ltx.FileIterator, error) {
94+
func (c *ReplicaClient) LTXFiles(ctx context.Context, level int, seek ltx.TXID) (ltx.FileIterator, error) {
9595
if err := c.Init(ctx); err != nil {
9696
return nil, err
9797
}
98-
return newLTXFileIterator(ctx, c, level), nil
98+
return newLTXFileIterator(ctx, c, level, seek), nil
9999
}
100100

101101
// WriteLTXFile writes an LTX file to remote storage.
@@ -206,6 +206,7 @@ func (c *ReplicaClient) DeleteAll(ctx context.Context) error {
206206
type ltxFileIterator struct {
207207
client *ReplicaClient
208208
level int
209+
seek ltx.TXID
209210

210211
ch chan ltx.FileInfo
211212
g errgroup.Group
@@ -216,10 +217,11 @@ type ltxFileIterator struct {
216217
err error
217218
}
218219

219-
func newLTXFileIterator(ctx context.Context, client *ReplicaClient, level int) *ltxFileIterator {
220+
func newLTXFileIterator(ctx context.Context, client *ReplicaClient, level int, seek ltx.TXID) *ltxFileIterator {
220221
itr := &ltxFileIterator{
221222
client: client,
222223
level: level,
224+
seek: seek,
223225
ch: make(chan ltx.FileInfo),
224226
}
225227

@@ -234,12 +236,16 @@ func (itr *ltxFileIterator) fetch() error {
234236
defer close(itr.ch)
235237

236238
dir := litestream.LTXLevelDir(itr.client.Path, itr.level)
239+
prefix := dir + "/"
240+
if itr.seek != 0 {
241+
prefix += itr.seek.String()
242+
}
237243

238244
var marker azblob.Marker
239245
for marker.NotDone() {
240246
internal.OperationTotalCounterVec.WithLabelValues(ReplicaClientType, "LIST").Inc()
241247

242-
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: dir + "/"})
248+
resp, err := itr.client.containerURL.ListBlobsFlatSegment(itr.ctx, marker, azblob.ListBlobsSegmentOptions{Prefix: prefix})
243249
if err != nil {
244250
return err
245251
}

cmd/litestream/databases.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
3535
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
3636
defer w.Flush()
3737

38-
fmt.Fprintln(w, "path\treplicas")
38+
fmt.Fprintln(w, "path\treplica")
3939
for _, dbConfig := range config.DBs {
4040
db, err := NewDBFromConfig(dbConfig)
4141
if err != nil {
@@ -44,7 +44,7 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
4444

4545
fmt.Fprintf(w, "%s\t%s\n",
4646
db.Path(),
47-
db.Replica.Name())
47+
db.Replica.Client.Type())
4848
}
4949

5050
return nil

cmd/litestream/ltx.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (c *LTXCommand) Run(ctx context.Context, args []string) (err error) {
7070
defer w.Flush()
7171

7272
fmt.Fprintln(w, "min_txid\tmax_txid\tsize\tcreated")
73-
itr, err := r.Client.LTXFiles(ctx, 0)
73+
itr, err := r.Client.LTXFiles(ctx, 0, 0)
7474
if err != nil {
7575
return err
7676
}

cmd/litestream/main.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ type Config struct {
153153
// Bind address for serving metrics.
154154
Addr string `yaml:"addr"`
155155

156+
// List of stages in a multi-level compaction.
157+
// Only includes L1 through the last non-snapshot level.
158+
Levels []*CompactionLevelConfig `yaml:"levels"`
159+
156160
// List of databases to manage.
157161
DBs []*DBConfig `yaml:"dbs"`
158162

@@ -194,6 +198,23 @@ func DefaultConfig() Config {
194198
return Config{}
195199
}
196200

201+
// CompactionLevels returns a full list of compaction levels include L0.
202+
func (c *Config) CompactionLevels() litestream.CompactionLevels {
203+
levels := litestream.CompactionLevels{
204+
{Level: 0},
205+
}
206+
207+
for i, lvl := range c.Levels {
208+
levels = append(levels, &litestream.CompactionLevel{
209+
Level: i + 1,
210+
Interval: lvl.Interval,
211+
Retention: lvl.Retention,
212+
})
213+
}
214+
215+
return levels
216+
}
217+
197218
// DBConfig returns database configuration by path.
198219
func (c *Config) DBConfig(path string) *DBConfig {
199220
for _, dbConfig := range c.DBs {
@@ -275,6 +296,12 @@ func ReadConfigFile(filename string, expandEnv bool) (_ Config, err error) {
275296
return config, nil
276297
}
277298

299+
// CompactionLevelConfig the configuration for a single level of compaction.
300+
type CompactionLevelConfig struct {
301+
Interval time.Duration `yaml:"interval"`
302+
Retention time.Duration `yaml:"retention"`
303+
}
304+
278305
// DBConfig represents the configuration for a single database.
279306
type DBConfig struct {
280307
Path string `yaml:"path"`
@@ -391,7 +418,7 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
391418
}
392419

393420
// Build replica.
394-
r := litestream.NewReplica(db, c.Name)
421+
r := litestream.NewReplica(db)
395422
if v := c.Retention; v != nil {
396423
r.Retention = *v
397424
}
@@ -482,7 +509,7 @@ func newFileReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_
482509
}
483510

484511
// newS3ReplicaClientFromConfig returns a new instance of s3.ReplicaClient built from config.
485-
func newS3ReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *s3.ReplicaClient, err error) {
512+
func newS3ReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *s3.ReplicaClient, err error) {
486513
// Ensure URL & constituent parts are not both specified.
487514
if c.URL != "" && c.Path != "" {
488515
return nil, fmt.Errorf("cannot specify url & path for s3 replica")
@@ -545,7 +572,7 @@ func newS3ReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *s
545572
}
546573

547574
// newGCSReplicaClientFromConfig returns a new instance of gcs.ReplicaClient built from config.
548-
func newGCSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *gcs.ReplicaClient, err error) {
575+
func newGCSReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *gcs.ReplicaClient, err error) {
549576
// Ensure URL & constituent parts are not both specified.
550577
if c.URL != "" && c.Path != "" {
551578
return nil, fmt.Errorf("cannot specify url & path for gcs replica")
@@ -584,7 +611,7 @@ func newGCSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *
584611
}
585612

586613
// newABSReplicaClientFromConfig returns a new instance of abs.ReplicaClient built from config.
587-
func newABSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *abs.ReplicaClient, err error) {
614+
func newABSReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *abs.ReplicaClient, err error) {
588615
// Ensure URL & constituent parts are not both specified.
589616
if c.URL != "" && c.Path != "" {
590617
return nil, fmt.Errorf("cannot specify url & path for abs replica")
@@ -627,7 +654,7 @@ func newABSReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *
627654
}
628655

629656
// newSFTPReplicaClientFromConfig returns a new instance of sftp.ReplicaClient built from config.
630-
func newSFTPReplicaClientFromConfig(c *ReplicaConfig, r *litestream.Replica) (_ *sftp.ReplicaClient, err error) {
657+
func newSFTPReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *sftp.ReplicaClient, err error) {
631658
// Ensure URL & constituent parts are not both specified.
632659
if c.URL != "" && c.Path != "" {
633660
return nil, fmt.Errorf("cannot specify url & path for sftp replica")

cmd/litestream/replicate.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ type ReplicateCommand struct {
2828

2929
Config Config
3030

31-
// List of managed databases specified in the config.
32-
DBs []*litestream.DB
31+
// Manages the set of databases & compaction levels.
32+
Store *litestream.Store
3333
}
3434

3535
func NewReplicateCommand() *ReplicateCommand {
@@ -92,24 +92,23 @@ func (c *ReplicateCommand) Run() (err error) {
9292
slog.Error("no databases specified in configuration")
9393
}
9494

95+
var dbs []*litestream.DB
9596
for _, dbConfig := range c.Config.DBs {
9697
db, err := NewDBFromConfig(dbConfig)
9798
if err != nil {
9899
return err
99100
}
100-
101-
// Open database & attach to program.
102-
if err := db.Open(); err != nil {
103-
return err
104-
}
105-
c.DBs = append(c.DBs, db)
101+
dbs = append(dbs, db)
106102
}
107103

104+
levels := c.Config.CompactionLevels()
105+
c.Store = litestream.NewStore(dbs, levels)
106+
108107
// Notify user that initialization is done.
109-
for _, db := range c.DBs {
108+
for _, db := range c.Store.DBs() {
110109
r := db.Replica
111110
slog.Info("initialized db", "path", db.Path())
112-
slog := slog.With("name", r.Name(), "type", r.Client.Type(), "sync-interval", r.SyncInterval)
111+
slog := slog.With("type", r.Client.Type(), "sync-interval", r.SyncInterval)
113112
switch client := r.Client.(type) {
114113
case *file.ReplicaClient:
115114
slog.Info("replicating to", "path", client.Path())
@@ -166,13 +165,8 @@ func (c *ReplicateCommand) Run() (err error) {
166165

167166
// Close closes all open databases.
168167
func (c *ReplicateCommand) Close() (err error) {
169-
for _, db := range c.DBs {
170-
if e := db.Close(context.Background()); e != nil {
171-
db.Logger.Error("error closing db", "error", e)
172-
if err == nil {
173-
err = e
174-
}
175-
}
168+
if e := c.Store.Close(); e != nil {
169+
slog.Error("failed to close database", "error", e)
176170
}
177171
return err
178172
}

cmd/litestream/restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (c *RestoreCommand) loadFromURL(ctx context.Context, replicaURL string, ifD
107107
}
108108

109109
// loadFromConfig returns a replica & updates the restore options from a DB reference.
110-
func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath string, expandEnv, ifDBNotExists bool, opt *litestream.RestoreOptions) (*litestream.Replica, error) {
110+
func (c *RestoreCommand) loadFromConfig(_ context.Context, dbPath, configPath string, expandEnv, ifDBNotExists bool, opt *litestream.RestoreOptions) (*litestream.Replica, error) {
111111
// Load configuration.
112112
config, err := ReadConfigFile(configPath, expandEnv)
113113
if err != nil {

compaction_level.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package litestream
2+
3+
import (
4+
"fmt"
5+
"time"
6+
)
7+
8+
// SnapshotLevel represents the level which full snapshots are held.
9+
const SnapshotLevel = 9
10+
11+
// CompactionLevel represents a single part of a multi-level compaction.
12+
// Each level merges LTX files from the previous level into larger time granularities.
13+
type CompactionLevel struct {
14+
// The numeric level. Must match the index in the list of levels.
15+
Level int
16+
17+
// The frequency that the level is compacted from the previous level.
18+
Interval time.Duration
19+
20+
// The duration that files in this level are stored.
21+
Retention time.Duration
22+
}
23+
24+
// PrevCompactionAt returns the time when the last compaction occurred.
25+
// Returns the current time if it is exactly a multiple of the level interval.
26+
func (lvl *CompactionLevel) PrevCompactionAt(now time.Time) time.Time {
27+
return now.Truncate(lvl.Interval).UTC()
28+
}
29+
30+
// NextCompactionAt returns the time until the next compaction occurs.
31+
// Returns the current time if it is exactly a multiple of the level interval.
32+
func (lvl *CompactionLevel) NextCompactionAt(now time.Time) time.Time {
33+
return lvl.PrevCompactionAt(now).Add(lvl.Interval)
34+
}
35+
36+
// CompactionLevels represents a sorted slice of non-snapshot compaction levels.
37+
type CompactionLevels []*CompactionLevel
38+
39+
// Level returns the compaction level at the given index.
40+
// Returns an error if the index is a snapshot level or is out of bounds.
41+
func (a CompactionLevels) Level(level int) (*CompactionLevel, error) {
42+
if level == SnapshotLevel {
43+
return nil, fmt.Errorf("invalid argument, snapshot level")
44+
}
45+
if level < 0 || level > a.MaxLevel() {
46+
return nil, fmt.Errorf("level out of bounds: %d", level)
47+
}
48+
return a[level], nil
49+
}
50+
51+
// MaxLevel return the highest non-snapshot compaction level.
52+
func (a CompactionLevels) MaxLevel() int {
53+
return len(a) - 1
54+
}
55+
56+
// Validate returns an error if the levels are invalid.
57+
func (a CompactionLevels) Validate() error {
58+
if len(a) == 0 {
59+
return fmt.Errorf("at least one compaction level is required")
60+
}
61+
62+
for i, lvl := range a {
63+
if i != lvl.Level {
64+
return fmt.Errorf("compaction level number out of order: %d, expected %d", lvl.Level, i)
65+
} else if lvl.Level > SnapshotLevel-1 {
66+
return fmt.Errorf("compaction level cannot exceed %d", SnapshotLevel-1)
67+
}
68+
69+
if lvl.Level == 0 && lvl.Interval != 0 {
70+
return fmt.Errorf("cannot set interval on compaction level zero")
71+
}
72+
73+
if lvl.Level != 0 && lvl.Interval <= 0 {
74+
return fmt.Errorf("interval required for level %d", lvl.Level)
75+
}
76+
}
77+
return nil
78+
}
79+
80+
// IsValidLevel returns true if level is a valid compaction level number.
81+
func (a CompactionLevels) IsValidLevel(level int) bool {
82+
if level == SnapshotLevel {
83+
return true
84+
}
85+
return level >= 0 && level < len(a)
86+
}
87+
88+
// PrevLevel returns the previous compaction level.
89+
// Returns -1 if there is no previous level.
90+
func (a CompactionLevels) PrevLevel(level int) int {
91+
if level == SnapshotLevel {
92+
return a.MaxLevel()
93+
}
94+
return level - 1
95+
}
96+
97+
// NextLevel returns the next compaction level.
98+
// Returns -1 if there is no next level.
99+
func (a CompactionLevels) NextLevel(level int) int {
100+
if level == SnapshotLevel {
101+
return -1
102+
} else if level == a.MaxLevel() {
103+
return SnapshotLevel
104+
}
105+
return level + 1
106+
}

0 commit comments

Comments
 (0)