Skip to content

Commit 522ad2f

Browse files
committed
Allow opening WAL on any index
1 parent 839985d commit 522ad2f

File tree

6 files changed

+36
-43
lines changed

6 files changed

+36
-43
lines changed

etcdutl/etcdutl/migrate_command.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func (c *migrateConfig) finalize() error {
108108
if err != nil {
109109
return fmt.Errorf("failed to get the lastest snapshot: %w", err)
110110
}
111-
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
111+
w, err := wal.OpenForRead(c.lg, walPath, wal.Position{Index: walSnap.Index, Term: walSnap.Term})
112112
if err != nil {
113113
return fmt.Errorf(`failed to open wal: %w`, err)
114114
}

server/etcdserver/bootstrap.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -588,15 +588,14 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot
588588

589589
// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
590590
// after the position of the given snap in the WAL.
591-
// The snap must have been previously saved to the WAL, or this call will panic.
592591
func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) {
593592
var walsnap walpb.Snapshot
594593
if snapshot != nil {
595594
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
596595
}
597596
repaired := false
598597
for {
599-
w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap)
598+
w, err := wal.Open(cfg.Logger, cfg.WALDir(), wal.Position{Index: snapshot.Metadata.Index, Term: snapshot.Metadata.Term})
600599
if err != nil {
601600
cfg.Logger.Fatal("failed to open WAL", zap.Error(err))
602601
}

server/storage/storage.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -110,18 +110,17 @@ func (st *storage) Sync() error {
110110
func (st *storage) MinimalEtcdVersion() *semver.Version {
111111
st.mux.Lock()
112112
defer st.mux.Unlock()
113-
walsnap := walpb.Snapshot{}
113+
pos := wal.Position{}
114114

115115
sn, err := st.s.Load()
116116
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
117117
panic(err)
118118
}
119119
if sn != nil {
120-
walsnap.Index = sn.Metadata.Index
121-
walsnap.Term = sn.Metadata.Term
122-
walsnap.ConfState = &sn.Metadata.ConfState
120+
pos.Index = sn.Metadata.Index
121+
pos.Term = sn.Metadata.Term
123122
}
124-
w, err := st.w.Reopen(st.lg, walsnap)
123+
w, err := st.w.Reopen(st.lg, pos)
125124
if err != nil {
126125
panic(err)
127126
}

server/storage/wal/wal.go

+28-33
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ type WAL struct {
8080
metadata []byte // metadata recorded at the head of each WAL
8181
state raftpb.HardState // hardstate recorded at the head of WAL
8282

83-
start walpb.Snapshot // snapshot to start reading
83+
start Position // Position to start reading
8484
decoder Decoder // decoder to Decode records
8585
readClose func() error // closer for Decode reader
8686

@@ -94,6 +94,10 @@ type WAL struct {
9494
fp *filePipeline
9595
}
9696

97+
type Position struct {
98+
Term, Index uint64
99+
}
100+
97101
// Create creates a WAL ready for appending records. The given metadata is
98102
// recorded at the head of each WAL file, and can be retrieved with ReadAll
99103
// after the file is Open.
@@ -258,12 +262,12 @@ func createNewWALFile[T *os.File | *fileutil.LockedFile](path string, forceNew b
258262
return any(file).(T), nil
259263
}
260264

261-
func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) {
265+
func (w *WAL) Reopen(lg *zap.Logger, pos Position) (*WAL, error) {
262266
err := w.Close()
263267
if err != nil {
264268
lg.Panic("failed to close WAL during reopen", zap.Error(err))
265269
}
266-
return Open(lg, w.dir, snap)
270+
return Open(lg, w.dir, pos)
267271
}
268272

269273
func (w *WAL) SetUnsafeNoFsync() {
@@ -324,7 +328,7 @@ func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
324328
}
325329

326330
// reopen and relock
327-
newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
331+
newWAL, oerr := Open(w.lg, w.dir, Position{Index: 0, Term: 0})
328332
if oerr != nil {
329333
return nil, oerr
330334
}
@@ -335,14 +339,12 @@ func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
335339
return newWAL, nil
336340
}
337341

338-
// Open opens the WAL at the given snap.
339-
// The snap SHOULD have been previously saved to the WAL, or the following
340-
// ReadAll will fail.
342+
// Open opens the WAL at the given position.
341343
// The returned WAL is ready to read and the first record will be the one after
342-
// the given snap. The WAL cannot be appended to before reading out all of its
344+
// the given position. The WAL cannot be appended to before reading out all of its
343345
// previous records.
344-
func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
345-
w, err := openAtIndex(lg, dirpath, snap, true)
346+
func Open(lg *zap.Logger, dirpath string, pos Position) (*WAL, error) {
347+
w, err := openAtPosition(lg, dirpath, pos, true)
346348
if err != nil {
347349
return nil, fmt.Errorf("openAtIndex failed: %w", err)
348350
}
@@ -354,15 +356,15 @@ func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
354356

355357
// OpenForRead only opens the wal files for read.
356358
// Write on a read only wal panics.
357-
func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
358-
return openAtIndex(lg, dirpath, snap, false)
359+
func OpenForRead(lg *zap.Logger, dirpath string, pos Position) (*WAL, error) {
360+
return openAtPosition(lg, dirpath, pos, false)
359361
}
360362

361-
func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
363+
func openAtPosition(lg *zap.Logger, dirpath string, pos Position, write bool) (*WAL, error) {
362364
if lg == nil {
363365
lg = zap.NewNop()
364366
}
365-
names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
367+
names, nameIndex, err := selectWALFiles(lg, dirpath, pos)
366368
if err != nil {
367369
return nil, fmt.Errorf("[openAtIndex] selectWALFiles failed: %w", err)
368370
}
@@ -376,7 +378,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
376378
w := &WAL{
377379
lg: lg,
378380
dir: dirpath,
379-
start: snap,
381+
start: pos,
380382
decoder: NewDecoder(rs...),
381383
readClose: closer,
382384
locks: ls,
@@ -396,15 +398,15 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
396398
return w, nil
397399
}
398400

399-
func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
401+
func selectWALFiles(lg *zap.Logger, dirpath string, pos Position) ([]string, int, error) {
400402
names, err := readWALNames(lg, dirpath)
401403
if err != nil {
402404
return nil, -1, fmt.Errorf("readWALNames failed: %w", err)
403405
}
404406

405-
nameIndex, ok := searchIndex(lg, names, snap.Index)
407+
nameIndex, ok := searchIndex(lg, names, pos.Index)
406408
if !ok {
407-
return nil, -1, fmt.Errorf("wal: file not found which matches the snapshot index '%d'", snap.Index)
409+
return nil, -1, fmt.Errorf("wal: file not found which matches the index '%d'", pos.Index)
408410
}
409411

410412
if !isValidSeq(lg, names[nameIndex:]) {
@@ -453,9 +455,8 @@ func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int,
453455
// If opened in write mode, it must read out all records until EOF. Or an error
454456
// will be returned.
455457
// If opened in read mode, it will try to read all records if possible.
456-
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
457-
// If loaded snap doesn't match with the expected one, it will return
458-
// all the records and error ErrSnapshotMismatch.
458+
// If the position matches snapshot and term doesn't match, it will
459+
// return error ErrSnapshotMismatch.
459460
// TODO: detect not-last-snap error.
460461
// TODO: maybe loose the checking of match.
461462
// After ReadAll, the WAL will be ready for appending new records.
@@ -477,7 +478,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
477478
}
478479
decoder := w.decoder
479480

480-
var match bool
481481
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
482482
switch rec.Type {
483483
case EntryType:
@@ -526,7 +526,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
526526
state.Reset()
527527
return nil, state, nil, ErrSnapshotMismatch
528528
}
529-
match = true
530529
}
531530

532531
default:
@@ -565,16 +564,13 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
565564
}
566565

567566
err = nil
568-
if !match {
569-
err = ErrSnapshotNotFound
570-
}
571567

572568
// close decoder, disable reading
573569
if w.readClose != nil {
574570
w.readClose()
575571
w.readClose = nil
576572
}
577-
w.start = walpb.Snapshot{}
573+
w.start = Position{}
578574

579575
w.metadata = metadata
580576

@@ -658,10 +654,9 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
658654
// It creates a new decoder to read through the records of the given WAL.
659655
// It does not conflict with any open WAL, but it is recommended not to
660656
// call this function after opening the WAL for writing.
661-
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
662-
// If the loaded snap doesn't match with the expected one, it will
657+
// If the position matches snapshot and term doesn't match, it will
663658
// return error ErrSnapshotMismatch.
664-
func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardState, error) {
659+
func Verify(lg *zap.Logger, walDir string, pos Position) (*raftpb.HardState, error) {
665660
var metadata []byte
666661
var err error
667662
var match bool
@@ -672,7 +667,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
672667
if lg == nil {
673668
lg = zap.NewNop()
674669
}
675-
names, nameIndex, err := selectWALFiles(lg, walDir, snap)
670+
names, nameIndex, err := selectWALFiles(lg, walDir, pos)
676671
if err != nil {
677672
return nil, err
678673
}
@@ -710,8 +705,8 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
710705
case SnapshotType:
711706
var loadedSnap walpb.Snapshot
712707
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
713-
if loadedSnap.Index == snap.Index {
714-
if loadedSnap.Term != snap.Term {
708+
if loadedSnap.Index == pos.Index {
709+
if loadedSnap.Term != pos.Term {
715710
return nil, ErrSnapshotMismatch
716711
}
717712
match = true

server/verify/verify.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func validateWAL(cfg Config) (*walpb.Snapshot, *raftpb.HardState, error) {
140140
}
141141

142142
snapshot := walSnaps[len(walSnaps)-1]
143-
hardstate, err := wal2.Verify(cfg.Logger, walDir, snapshot)
143+
hardstate, err := wal2.Verify(cfg.Logger, walDir, wal2.Position{Index: snapshot.Index, Term: snapshot.Term})
144144
if err != nil {
145145
return nil, nil, err
146146
}

tests/robustness/report/wal.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raf
107107
walDir := datadir.ToWALDir(dataDir)
108108
repaired := false
109109
for {
110-
w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0})
110+
w, err := wal.OpenForRead(lg, walDir, wal.Position{Index: 0, Term: 0})
111111
if err != nil {
112112
return state, nil, fmt.Errorf("failed to open WAL, err: %w", err)
113113
}

0 commit comments

Comments
 (0)