Skip to content

Commit ab74cf7

Browse files
committed
Remove multiple replica support
1 parent e3ff383 commit ab74cf7

File tree

6 files changed

+76
-139
lines changed

6 files changed

+76
-139
lines changed

cmd/litestream/databases.go

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"flag"
66
"fmt"
77
"os"
8-
"strings"
98
"text/tabwriter"
109
)
1110

@@ -43,15 +42,9 @@ func (c *DatabasesCommand) Run(ctx context.Context, args []string) (err error) {
4342
return err
4443
}
4544

46-
var replicaNames []string
47-
for _, r := range db.Replicas {
48-
replicaNames = append(replicaNames, r.Name())
49-
}
50-
5145
fmt.Fprintf(w, "%s\t%s\n",
5246
db.Path(),
53-
strings.Join(replicaNames, ","),
54-
)
47+
db.Replica.Name())
5548
}
5649

5750
return nil

cmd/litestream/ltx.go

Lines changed: 27 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ type LTXCommand struct{}
1818
func (c *LTXCommand) Run(ctx context.Context, args []string) (err error) {
1919
fs := flag.NewFlagSet("litestream-ltx", flag.ContinueOnError)
2020
configPath, noExpandEnv := registerConfigFlag(fs)
21-
replicaName := fs.String("replica", "", "replica name")
2221
fs.Usage = c.Usage
2322
if err := fs.Parse(args); err != nil {
2423
return err
@@ -28,7 +27,6 @@ func (c *LTXCommand) Run(ctx context.Context, args []string) (err error) {
2827
return fmt.Errorf("too many arguments")
2928
}
3029

31-
var db *litestream.DB
3230
var r *litestream.Replica
3331
if isURL(fs.Arg(0)) {
3432
if *configPath != "" {
@@ -49,62 +47,46 @@ func (c *LTXCommand) Run(ctx context.Context, args []string) (err error) {
4947
}
5048

5149
// Lookup database from configuration file by path.
52-
if path, err := expand(fs.Arg(0)); err != nil {
50+
path, err := expand(fs.Arg(0))
51+
if err != nil {
5352
return err
54-
} else if dbc := config.DBConfig(path); dbc == nil {
53+
}
54+
dbc := config.DBConfig(path)
55+
if dbc == nil {
5556
return fmt.Errorf("database not found in config: %s", path)
56-
} else if db, err = NewDBFromConfig(dbc); err != nil {
57-
return err
5857
}
5958

60-
// Filter by replica, if specified.
61-
if *replicaName != "" {
62-
if r = db.Replica(*replicaName); r == nil {
63-
return fmt.Errorf("replica %q not found for database %q", *replicaName, db.Path())
64-
}
59+
db, err := NewDBFromConfig(dbc)
60+
if err != nil {
61+
return err
62+
} else if db.Replica == nil {
63+
return fmt.Errorf("database has no replica")
6564
}
66-
}
67-
68-
// Find WAL files by db or replica.
69-
var replicas []*litestream.Replica
70-
if r != nil {
71-
replicas = []*litestream.Replica{r}
72-
} else {
73-
replicas = db.Replicas
65+
r = db.Replica
7466
}
7567

7668
// List all WAL files.
7769
w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0)
7870
defer w.Flush()
7971

80-
fmt.Fprintln(w, "replica\tmin_txid\tmax_txid\tsize\tcreated")
81-
for _, r := range replicas {
82-
if err := func() error {
83-
itr, err := r.Client.LTXFiles(ctx, 0)
84-
if err != nil {
85-
return err
86-
}
87-
defer itr.Close()
88-
89-
for itr.Next() {
90-
info := itr.Item()
91-
92-
fmt.Fprintf(w, "%s\t%x\t%d\t%d\t%s\n",
93-
r.Name(),
94-
info.MinTXID,
95-
info.MaxTXID,
96-
info.Size,
97-
info.CreatedAt.Format(time.RFC3339),
98-
)
99-
}
100-
return itr.Close()
101-
}(); err != nil {
102-
r.Logger().Error("cannot fetch ltx files", "error", err)
103-
continue
104-
}
72+
fmt.Fprintln(w, "min_txid\tmax_txid\tsize\tcreated")
73+
itr, err := r.Client.LTXFiles(ctx, 0)
74+
if err != nil {
75+
return err
10576
}
77+
defer itr.Close()
78+
79+
for itr.Next() {
80+
info := itr.Item()
10681

107-
return nil
82+
fmt.Fprintf(w, "%x\t%d\t%d\t%s\n",
83+
info.MinTXID,
84+
info.MaxTXID,
85+
info.Size,
86+
info.CreatedAt.Format(time.RFC3339),
87+
)
88+
}
89+
return itr.Close()
10890
}
10991

11092
// Usage prints the help screen to STDOUT.

cmd/litestream/main.go

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,8 @@ type DBConfig struct {
285285
MinCheckpointPageN *int `yaml:"min-checkpoint-page-count"`
286286
MaxCheckpointPageN *int `yaml:"max-checkpoint-page-count"`
287287

288-
Replicas []*ReplicaConfig `yaml:"replicas"`
288+
Replica *ReplicaConfig `yaml:"replica"`
289+
Replicas []*ReplicaConfig `yaml:"replicas"` // Deprecated
289290
}
290291

291292
// NewDBFromConfig instantiates a DB based on a configuration.
@@ -318,22 +319,37 @@ func NewDBFromConfig(dbc *DBConfig) (*litestream.DB, error) {
318319
db.MaxCheckpointPageN = *dbc.MaxCheckpointPageN
319320
}
320321

321-
// Instantiate and attach replicas.
322-
for _, rc := range dbc.Replicas {
323-
r, err := NewReplicaFromConfig(rc, db)
324-
if err != nil {
325-
return nil, err
326-
}
327-
db.Replicas = append(db.Replicas, r)
322+
// Instantiate and attach replica.
323+
// v0.3.x and before supported multiple replicas but that was dropped to
324+
// ensure there's a single remote data authority.
325+
if dbc.Replica == nil && len(dbc.Replicas) == 0 {
326+
return nil, fmt.Errorf("must specify replica for database")
327+
} else if dbc.Replica != nil && len(dbc.Replicas) > 0 {
328+
return nil, fmt.Errorf("cannot specify 'replica' and 'replicas' on a database")
329+
} else if len(dbc.Replicas) > 1 {
330+
return nil, fmt.Errorf("multiple replicas on a single database are no longer supported")
331+
}
332+
333+
var rc *ReplicaConfig
334+
if dbc.Replica != nil {
335+
rc = dbc.Replica
336+
} else {
337+
rc = dbc.Replicas[0]
338+
}
339+
340+
r, err := NewReplicaFromConfig(rc, db)
341+
if err != nil {
342+
return nil, err
328343
}
344+
db.Replica = r
329345

330346
return db, nil
331347
}
332348

333349
// ReplicaConfig represents the configuration for a single replica in a database.
334350
type ReplicaConfig struct {
335351
Type string `yaml:"type"` // "file", "s3"
336-
Name string `yaml:"name"` // name of replica, optional.
352+
Name string `yaml:"name"` // Deprecated
337353
Path string `yaml:"path"`
338354
URL string `yaml:"url"`
339355
Retention *time.Duration `yaml:"retention"`

cmd/litestream/replicate.go

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -107,23 +107,22 @@ func (c *ReplicateCommand) Run() (err error) {
107107

108108
// Notify user that initialization is done.
109109
for _, db := range c.DBs {
110+
r := db.Replica
110111
slog.Info("initialized db", "path", db.Path())
111-
for _, r := range db.Replicas {
112-
slog := slog.With("name", r.Name(), "type", r.Client.Type(), "sync-interval", r.SyncInterval)
113-
switch client := r.Client.(type) {
114-
case *file.ReplicaClient:
115-
slog.Info("replicating to", "path", client.Path())
116-
case *s3.ReplicaClient:
117-
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "region", client.Region, "endpoint", client.Endpoint)
118-
case *gcs.ReplicaClient:
119-
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path)
120-
case *abs.ReplicaClient:
121-
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "endpoint", client.Endpoint)
122-
case *sftp.ReplicaClient:
123-
slog.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path)
124-
default:
125-
slog.Info("replicating to")
126-
}
112+
slog := slog.With("name", r.Name(), "type", r.Client.Type(), "sync-interval", r.SyncInterval)
113+
switch client := r.Client.(type) {
114+
case *file.ReplicaClient:
115+
slog.Info("replicating to", "path", client.Path())
116+
case *s3.ReplicaClient:
117+
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "region", client.Region, "endpoint", client.Endpoint)
118+
case *gcs.ReplicaClient:
119+
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path)
120+
case *abs.ReplicaClient:
121+
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "endpoint", client.Endpoint)
122+
case *sftp.ReplicaClient:
123+
slog.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path)
124+
default:
125+
slog.Info("replicating to")
127126
}
128127
}
129128

cmd/litestream/restore.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,7 @@ func (c *RestoreCommand) loadFromConfig(ctx context.Context, dbPath, configPath
137137
return nil, errSkipDBExists
138138
}
139139

140-
// Determine the appropriate replica to restore from,
141-
r, err := db.CalcRestoreTarget(ctx, *opt)
142-
if err != nil {
143-
return nil, err
144-
}
145-
146-
return r, nil
140+
return db.Replica, nil
147141
}
148142

149143
// Usage prints the help screen to STDOUT.

db.go

Lines changed: 7 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ type DB struct {
9898
// The timeout to wait for EBUSY from SQLite.
9999
BusyTimeout time.Duration
100100

101-
// List of replicas for the database.
101+
// Remote replica for the database.
102102
// Must be set before calling Open().
103-
Replicas []*Replica
103+
Replica *Replica
104104

105105
// Where to send log messages, defaults to global slog with database epath.
106106
Logger *slog.Logger
@@ -213,16 +213,6 @@ func (db *DB) DirInfo() os.FileInfo {
213213
return db.dirInfo
214214
}
215215

216-
// Replica returns a replica by name.
217-
func (db *DB) Replica(name string) *Replica {
218-
for _, r := range db.Replicas {
219-
if r.Name() == name {
220-
return r
221-
}
222-
}
223-
return nil
224-
}
225-
226216
// Pos returns the current replication position of the database.
227217
func (db *DB) Pos() (ltx.Pos, error) {
228218
minTXID, maxTXID, err := db.MaxLTX()
@@ -267,15 +257,6 @@ func (db *DB) Open() (err error) {
267257
return fmt.Errorf("minimum checkpoint page count required")
268258
}
269259

270-
// Validate that all replica names are unique.
271-
m := make(map[string]struct{})
272-
for _, r := range db.Replicas {
273-
if _, ok := m[r.Name()]; ok {
274-
return fmt.Errorf("duplicate replica name: %q", r.Name())
275-
}
276-
m[r.Name()] = struct{}{}
277-
}
278-
279260
// Clear old temporary files that my have been left from a crash.
280261
if err := removeTmpFiles(db.metaPath); err != nil {
281262
return fmt.Errorf("cannot remove tmp files: %w", err)
@@ -304,13 +285,13 @@ func (db *DB) Close(ctx context.Context) (err error) {
304285
}
305286

306287
// Ensure replicas perform a final sync and stop replicating.
307-
for _, r := range db.Replicas {
288+
if db.Replica != nil {
308289
if db.db != nil {
309-
if e := r.Sync(ctx); e != nil && err == nil {
290+
if e := db.Replica.Sync(ctx); e != nil && err == nil {
310291
err = e
311292
}
312293
}
313-
r.Stop(true)
294+
db.Replica.Stop(true)
314295
}
315296

316297
// Release the read lock to allow other applications to handle checkpointing.
@@ -440,8 +421,8 @@ func (db *DB) init() (err error) {
440421
// TODO(gen): Generate diff of current LTX snapshot and save as next LTX file.
441422

442423
// Start replication.
443-
for _, r := range db.Replicas {
444-
r.Start(db.ctx)
424+
if db.Replica != nil {
425+
db.Replica.Start(db.ctx)
445426
}
446427

447428
return nil
@@ -1096,34 +1077,6 @@ func (db *DB) monitor() {
10961077
}
10971078
}
10981079

1099-
// CalcRestoreTarget returns a replica to restore from based on opt criteria.
1100-
func (db *DB) CalcRestoreTarget(ctx context.Context, opt RestoreOptions) (*Replica, error) {
1101-
var target struct {
1102-
replica *Replica
1103-
updatedAt time.Time
1104-
}
1105-
1106-
for _, r := range db.Replicas {
1107-
// Skip replica if it does not match filter.
1108-
if opt.ReplicaName != "" && r.Name() != opt.ReplicaName {
1109-
continue
1110-
}
1111-
1112-
updatedAt, err := r.CalcRestoreTarget(ctx, opt)
1113-
if err != nil {
1114-
return nil, err
1115-
}
1116-
1117-
// Use the latest replica if we have multiple candidates.
1118-
if !updatedAt.After(target.updatedAt) {
1119-
continue
1120-
}
1121-
1122-
target.replica, target.updatedAt = r, updatedAt
1123-
}
1124-
return target.replica, nil
1125-
}
1126-
11271080
// CRC64 returns a CRC-64 ISO checksum of the database and its current position.
11281081
//
11291082
// This function obtains a read lock so it prevents syncs from occurring until

0 commit comments

Comments
 (0)