Skip to content

Commit 2ca35a8

Browse files
committed
Implement multi-level restore (#652)
1 parent af26f82 commit 2ca35a8

File tree

9 files changed

+341
-14
lines changed

9 files changed

+341
-14
lines changed

cmd/litestream/restore.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ func (c *RestoreCommand) Run(ctx context.Context, args []string) (err error) {
2323
fs := flag.NewFlagSet("litestream-restore", flag.ContinueOnError)
2424
configPath, noExpandEnv := registerConfigFlag(fs)
2525
fs.StringVar(&opt.OutputPath, "o", "", "output path")
26-
fs.StringVar(&opt.ReplicaName, "replica", "", "replica name")
2726
fs.Var((*txidVar)(&opt.TXID), "txid", "transaction ID")
2827
fs.IntVar(&opt.Parallelism, "parallelism", opt.Parallelism, "parallelism")
2928
ifDBNotExists := fs.Bool("if-db-not-exists", false, "")
@@ -160,10 +159,6 @@ Arguments:
160159
-no-expand-env
161160
Disables environment variable expansion in configuration file.
162161
163-
-replica NAME
164-
Restore from a specific replica.
165-
Defaults to replica with latest data.
166-
167162
-index NUM
168163
Restore up to a specific hex-encoded WAL index (inclusive).
169164
Defaults to use the highest available index.

db.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,10 +1336,6 @@ type RestoreOptions struct {
13361336
// If blank, the original DB path is used.
13371337
OutputPath string
13381338

1339-
// Specific replica to restore from.
1340-
// If blank, all replicas are considered.
1341-
ReplicaName string
1342-
13431339
// Specific transaction to restore to.
13441340
// If zero, TXID is ignored.
13451341
TXID ltx.TXID

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ require (
1515
github.com/mattn/go-sqlite3 v1.14.19
1616
github.com/pkg/sftp v1.13.6
1717
github.com/prometheus/client_golang v1.17.0
18-
github.com/superfly/ltx v0.3.15
18+
github.com/superfly/ltx v0.3.16
1919
golang.org/x/crypto v0.17.0
2020
golang.org/x/sync v0.5.0
2121
golang.org/x/sys v0.15.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
166166
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
167167
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
168168
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
169-
github.com/superfly/ltx v0.3.15 h1:gc+iTncNH5FMPMzxAtRS7DvL0ayxCOCrKjS9S8Q7PQA=
170-
github.com/superfly/ltx v0.3.15/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s=
169+
github.com/superfly/ltx v0.3.16 h1:osibxhNf/4+DOA4fKuPxXVMPKvqoHgtCdy0tYb+AdVQ=
170+
github.com/superfly/ltx v0.3.16/go.mod h1:Nf50QAIXU/ET4ua3AuQ2fh31MbgNQZA7r/DYx6Os77s=
171171
github.com/yosida95/uritemplate/v3 v3.0.2 h1:Ed3Oyj9yrmi9087+NczuL5BwkIc4wvTb5zIM+UJPGz4=
172172
github.com/yosida95/uritemplate/v3 v3.0.2/go.mod h1:ILOh0sOhIJR3+L/8afwt/kE++YT040gmv5BQTMR2HP4=
173173
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=

replica.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ func NewReplica(db *DB) *Replica {
7878
return r
7979
}
8080

81+
func NewReplicaWithClient(db *DB, client ReplicaClient) *Replica {
82+
r := NewReplica(db)
83+
r.Client = client
84+
return r
85+
}
86+
8187
// Logger returns the DB sub-logger for this replica.
8288
func (r *Replica) Logger() *slog.Logger {
8389
logger := slog.Default()
@@ -663,6 +669,57 @@ func (r *Replica) Restore(ctx context.Context, opt RestoreOptions) (err error) {
663669
return nil
664670
}
665671

672+
// CalcRestorePlan returns a list of storage paths to restore a snapshot at the given TXID.
673+
func (r *Replica) CalcRestorePlan(ctx context.Context, txID ltx.TXID) ([]*ltx.FileInfo, error) {
674+
var infos ltx.FileInfoSlice
675+
logger := r.Logger().With("target", txID)
676+
677+
// Start with latest snapshot before target TXID.
678+
if a, err := FindLTXFiles(ctx, r.Client, SnapshotLevel, func(info *ltx.FileInfo) (bool, error) {
679+
return info.MaxTXID <= txID, nil
680+
}); err != nil {
681+
return nil, err
682+
} else if len(a) > 0 {
683+
logger.Debug("found snapshot before target TXID", "snapshot", a[len(a)-1].MaxTXID)
684+
infos = append(infos, a[len(a)-1])
685+
}
686+
687+
// Starting from the highest compaction level, collect all paths after the
688+
// latest TXID for each level. Compactions are based on the previous level's
689+
// TXID granularity so the TXIDs should align between compaction levels.
690+
const maxLevel = SnapshotLevel - 1
691+
for level := maxLevel; level >= 0; level-- {
692+
a, err := FindLTXFiles(ctx, r.Client, level, func(info *ltx.FileInfo) (bool, error) {
693+
return info.MinTXID > infos.MaxTXID() && info.MaxTXID <= txID, nil
694+
})
695+
if err != nil {
696+
return nil, err
697+
}
698+
699+
// Append each storage path to the list
700+
for i := range a {
701+
// Ensure TXIDs are contiguous between each paths.
702+
if infos.MaxTXID()+1 != a[i].MinTXID {
703+
return nil, fmt.Errorf("non-contiguous transaction files: prev=%s filename=%s",
704+
infos.MaxTXID().String(), ltx.FormatFilename(a[i].MinTXID, a[i].MaxTXID))
705+
}
706+
707+
logger.Debug("matching LTX file for restore",
708+
"filename", ltx.FormatFilename(a[len(a)-1].MinTXID, a[len(a)-1].MaxTXID))
709+
infos = append(infos, a[i])
710+
}
711+
}
712+
713+
// Return an error if we are unable to find any set of LTX files before
714+
// target TXID. This shouldn't happen under normal circumstances. Only if
715+
// lower level LTX files are removed before a snapshot has occurred.
716+
if len(infos) == 0 {
717+
return nil, ErrTxNotAvailable
718+
}
719+
720+
return infos, nil
721+
}
722+
666723
// Replica metrics.
667724
var (
668725
replicaValidationTotalCounterVec = promauto.NewCounterVec(prometheus.CounterOpts{

replica_client.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,14 @@ package litestream
22

33
import (
44
"context"
5+
"errors"
56
"io"
67

78
"github.com/superfly/ltx"
89
)
910

11+
var ErrStopIter = errors.New("stop iterator")
12+
1013
// ReplicaClient represents client to connect to a Replica.
1114
type ReplicaClient interface {
1215
// Type returns the type of client.
@@ -30,3 +33,33 @@ type ReplicaClient interface {
3033
// DeleteAll deletes all files.
3134
DeleteAll(ctx context.Context) error
3235
}
36+
37+
// FindLTXFiles returns a list of files that match filter.
38+
func FindLTXFiles(ctx context.Context, client ReplicaClient, level int, filter func(*ltx.FileInfo) (bool, error)) ([]*ltx.FileInfo, error) {
39+
itr, err := client.LTXFiles(ctx, level, 0)
40+
if err != nil {
41+
return nil, err
42+
}
43+
defer func() { _ = itr.Close() }()
44+
45+
var a []*ltx.FileInfo
46+
for itr.Next() {
47+
item := itr.Item()
48+
49+
match, err := filter(item)
50+
if match {
51+
a = append(a, item)
52+
}
53+
54+
if err == ErrStopIter {
55+
break
56+
} else if err != nil {
57+
return a, err
58+
}
59+
}
60+
61+
if err := itr.Close(); err != nil {
62+
return nil, err
63+
}
64+
return a, nil
65+
}

replica_test.go

Lines changed: 143 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package litestream_test
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
"github.com/benbjohnson/litestream"
89
"github.com/benbjohnson/litestream/file"
10+
"github.com/benbjohnson/litestream/mock"
911
"github.com/superfly/ltx"
1012
)
1113

@@ -29,8 +31,7 @@ func TestReplica_Sync(t *testing.T) {
2931
t.Logf("position after sync: %s", dpos.String())
3032

3133
c := file.NewReplicaClient(t.TempDir())
32-
r := litestream.NewReplica(db)
33-
c.Replica, r.Client = r, c
34+
r := litestream.NewReplicaWithClient(db, c)
3435

3536
if err := r.Sync(context.Background()); err != nil {
3637
t.Fatal(err)
@@ -82,3 +83,143 @@ func TestReplica_Sync(t *testing.T) {
8283

8384
// TODO(ltx): Restore snapshot and verify
8485
}
86+
87+
func TestReplica_CalcRestorePlan(t *testing.T) {
88+
db, sqldb := MustOpenDBs(t)
89+
defer MustCloseDBs(t, db, sqldb)
90+
91+
t.Run("SnapshotOnly", func(t *testing.T) {
92+
var c mock.ReplicaClient
93+
r := litestream.NewReplicaWithClient(db, &c)
94+
c.LTXFilesFunc = func(ctx context.Context, level int, seek ltx.TXID) (ltx.FileIterator, error) {
95+
if level == litestream.SnapshotLevel {
96+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{{
97+
Level: litestream.SnapshotLevel,
98+
MinTXID: 1,
99+
MaxTXID: 10,
100+
Size: 1024,
101+
CreatedAt: time.Now(),
102+
}}), nil
103+
}
104+
return ltx.NewFileInfoSliceIterator(nil), nil
105+
}
106+
107+
plan, err := r.CalcRestorePlan(context.Background(), 10)
108+
if err != nil {
109+
t.Fatalf("unexpected error: %v", err)
110+
}
111+
if got, want := len(plan), 1; got != want {
112+
t.Fatalf("n=%d, want %d", got, want)
113+
}
114+
if plan[0].MaxTXID != 10 {
115+
t.Fatalf("expected MaxTXID 10, got %d", plan[0].MaxTXID)
116+
}
117+
})
118+
119+
t.Run("SnapshotAndIncremental", func(t *testing.T) {
120+
var c mock.ReplicaClient
121+
r := litestream.NewReplicaWithClient(db, &c)
122+
c.LTXFilesFunc = func(ctx context.Context, level int, seek ltx.TXID) (ltx.FileIterator, error) {
123+
switch level {
124+
case litestream.SnapshotLevel:
125+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
126+
{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 5},
127+
{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 15},
128+
}), nil
129+
case 1:
130+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
131+
{Level: 1, MinTXID: 6, MaxTXID: 7},
132+
{Level: 1, MinTXID: 8, MaxTXID: 9},
133+
{Level: 1, MinTXID: 10, MaxTXID: 12},
134+
}), nil
135+
case 0:
136+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
137+
{Level: 0, MinTXID: 7, MaxTXID: 7},
138+
{Level: 0, MinTXID: 8, MaxTXID: 8},
139+
{Level: 0, MinTXID: 9, MaxTXID: 9},
140+
{Level: 0, MinTXID: 10, MaxTXID: 10},
141+
{Level: 0, MinTXID: 11, MaxTXID: 11},
142+
}), nil
143+
default:
144+
return ltx.NewFileInfoSliceIterator(nil), nil
145+
}
146+
}
147+
148+
plan, err := r.CalcRestorePlan(context.Background(), 10)
149+
if err != nil {
150+
t.Fatalf("unexpected error: %v", err)
151+
}
152+
if got, want := len(plan), 4; got != want {
153+
t.Fatalf("n=%v, want %v", got, want)
154+
}
155+
if got, want := *plan[0], (ltx.FileInfo{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 5}); got != want {
156+
t.Fatalf("plan[0]=%#v, want %#v", got, want)
157+
}
158+
if got, want := *plan[1], (ltx.FileInfo{Level: 1, MinTXID: 6, MaxTXID: 7}); got != want {
159+
t.Fatalf("plan[1]=%#v, want %#v", got, want)
160+
}
161+
if got, want := *plan[2], (ltx.FileInfo{Level: 1, MinTXID: 8, MaxTXID: 9}); got != want {
162+
t.Fatalf("plan[2]=%#v, want %#v", got, want)
163+
}
164+
if got, want := *plan[3], (ltx.FileInfo{Level: 0, MinTXID: 10, MaxTXID: 10}); got != want {
165+
t.Fatalf("plan[2]=%#v, want %#v", got, want)
166+
}
167+
})
168+
169+
t.Run("ErrNonContiguousFiles", func(t *testing.T) {
170+
var c mock.ReplicaClient
171+
r := litestream.NewReplicaWithClient(db, &c)
172+
c.LTXFilesFunc = func(ctx context.Context, level int, seek ltx.TXID) (ltx.FileIterator, error) {
173+
switch level {
174+
case litestream.SnapshotLevel:
175+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
176+
{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 5},
177+
}), nil
178+
case 1:
179+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
180+
{Level: 1, MinTXID: 8, MaxTXID: 9},
181+
}), nil
182+
default:
183+
return ltx.NewFileInfoSliceIterator(nil), nil
184+
}
185+
}
186+
187+
_, err := r.CalcRestorePlan(context.Background(), 10)
188+
if err == nil || err.Error() != `non-contiguous transaction files: prev=0000000000000005 filename=0000000000000008-0000000000000009.ltx` {
189+
t.Fatalf("unexpected error: %q", err)
190+
}
191+
})
192+
193+
t.Run("ErrTxNotAvailable", func(t *testing.T) {
194+
var c mock.ReplicaClient
195+
r := litestream.NewReplicaWithClient(db, &c)
196+
c.LTXFilesFunc = func(ctx context.Context, level int, seek ltx.TXID) (ltx.FileIterator, error) {
197+
switch level {
198+
case litestream.SnapshotLevel:
199+
return ltx.NewFileInfoSliceIterator([]*ltx.FileInfo{
200+
{Level: litestream.SnapshotLevel, MinTXID: 1, MaxTXID: 10},
201+
}), nil
202+
default:
203+
return ltx.NewFileInfoSliceIterator(nil), nil
204+
}
205+
}
206+
207+
_, err := r.CalcRestorePlan(context.Background(), 5)
208+
if err != litestream.ErrTxNotAvailable {
209+
t.Fatalf("expected ErrTxNotAvailable, got %v", err)
210+
}
211+
})
212+
213+
t.Run("ErrNoFiles", func(t *testing.T) {
214+
var c mock.ReplicaClient
215+
c.LTXFilesFunc = func(ctx context.Context, level int, seek ltx.TXID) (ltx.FileIterator, error) {
216+
return ltx.NewFileInfoSliceIterator(nil), nil
217+
}
218+
r := litestream.NewReplicaWithClient(db, &c)
219+
220+
_, err := r.CalcRestorePlan(context.Background(), 5)
221+
if err != litestream.ErrTxNotAvailable {
222+
t.Fatalf("expected ErrTxNotAvailable, got %v", err)
223+
}
224+
})
225+
}

store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ var (
2020
// since the last compaction time. This is used to prevent frequent
2121
// re-compaction when restarting the process.
2222
ErrCompactionTooEarly = errors.New("compaction too early")
23+
24+
// ErrTxNotAvailable is returned when a transaction does not exist.
25+
ErrTxNotAvailable = errors.New("transaction not available")
2326
)
2427

2528
// Store defaults
@@ -81,12 +84,14 @@ func (s *Store) Open(ctx context.Context) error {
8184
if lvl.Level == 0 {
8285
continue
8386
}
87+
s.wg.Add(1)
8488
go func() {
8589
defer s.wg.Done()
8690
s.monitorCompactionLevel(s.ctx, lvl)
8791
}()
8892
}
8993

94+
s.wg.Add(1)
9095
go func() {
9196
defer s.wg.Done()
9297
s.monitorCompactionLevel(s.ctx, s.SnapshotLevel())

0 commit comments

Comments
 (0)