From 16b3867db33e859e7567dc37f7c6d6f9921074eb Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Sun, 30 Mar 2025 12:41:26 +0200 Subject: [PATCH] Allow opening WAL on any index Signed-off-by: Marek Siarkowicz --- etcdutl/etcdutl/migrate_command.go | 2 +- server/etcdserver/bootstrap.go | 3 +- server/storage/storage.go | 9 ++--- server/storage/wal/wal.go | 61 ++++++++++++++---------------- server/verify/verify.go | 2 +- tests/robustness/report/wal.go | 2 +- 6 files changed, 36 insertions(+), 43 deletions(-) diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index a7f3d849f16..1a0f4a6fc3a 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -108,7 +108,7 @@ func (c *migrateConfig) finalize() error { if err != nil { return fmt.Errorf("failed to get the lastest snapshot: %w", err) } - w, err := wal.OpenForRead(c.lg, walPath, walSnap) + w, err := wal.OpenForRead(c.lg, walPath, wal.Position{Index: walSnap.Index, Term: walSnap.Term}) if err != nil { return fmt.Errorf(`failed to open wal: %w`, err) } diff --git a/server/etcdserver/bootstrap.go b/server/etcdserver/bootstrap.go index b25b7f6e1db..497b590128b 100644 --- a/server/etcdserver/bootstrap.go +++ b/server/etcdserver/bootstrap.go @@ -588,7 +588,6 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot // openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear // after the position of the given snap in the WAL. -// The snap must have been previously saved to the WAL, or this call will panic. func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) { var walsnap walpb.Snapshot if snapshot != nil { @@ -596,7 +595,7 @@ func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*w } repaired := false for { - w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap) + w, err := wal.Open(cfg.Logger, cfg.WALDir(), wal.Position{Index: snapshot.Metadata.Index, Term: snapshot.Metadata.Term}) if err != nil { cfg.Logger.Fatal("failed to open WAL", zap.Error(err)) } diff --git a/server/storage/storage.go b/server/storage/storage.go index 99a37a23d07..7aee0879c63 100644 --- a/server/storage/storage.go +++ b/server/storage/storage.go @@ -110,18 +110,17 @@ func (st *storage) Sync() error { func (st *storage) MinimalEtcdVersion() *semver.Version { st.mux.Lock() defer st.mux.Unlock() - walsnap := walpb.Snapshot{} + pos := wal.Position{} sn, err := st.s.Load() if err != nil && !errors.Is(err, snap.ErrNoSnapshot) { panic(err) } if sn != nil { - walsnap.Index = sn.Metadata.Index - walsnap.Term = sn.Metadata.Term - walsnap.ConfState = &sn.Metadata.ConfState + pos.Index = sn.Metadata.Index + pos.Term = sn.Metadata.Term } - w, err := st.w.Reopen(st.lg, walsnap) + w, err := st.w.Reopen(st.lg, pos) if err != nil { panic(err) } diff --git a/server/storage/wal/wal.go b/server/storage/wal/wal.go index f3d7bc5f4d4..c9fbe030309 100644 --- a/server/storage/wal/wal.go +++ b/server/storage/wal/wal.go @@ -80,7 +80,7 @@ type WAL struct { metadata []byte // metadata recorded at the head of each WAL state raftpb.HardState // hardstate recorded at the head of WAL - start walpb.Snapshot // snapshot to start reading + start Position // Position to start reading decoder Decoder // decoder to Decode records readClose func() error // closer for Decode reader @@ -94,6 +94,10 @@ type WAL struct { fp *filePipeline } +type Position struct { + Term, Index uint64 +} + // Create creates a WAL ready for appending records. The given metadata is // recorded at the head of each WAL file, and can be retrieved with ReadAll // after the file is Open. @@ -258,12 +262,12 @@ func createNewWALFile[T *os.File | *fileutil.LockedFile](path string, forceNew b return any(file).(T), nil } -func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) { +func (w *WAL) Reopen(lg *zap.Logger, pos Position) (*WAL, error) { err := w.Close() if err != nil { lg.Panic("failed to close WAL during reopen", zap.Error(err)) } - return Open(lg, w.dir, snap) + return Open(lg, w.dir, pos) } func (w *WAL) SetUnsafeNoFsync() { @@ -324,7 +328,7 @@ func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) { } // reopen and relock - newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{}) + newWAL, oerr := Open(w.lg, w.dir, Position{Index: 0, Term: 0}) if oerr != nil { return nil, oerr } @@ -335,14 +339,12 @@ func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) { return newWAL, nil } -// Open opens the WAL at the given snap. -// The snap SHOULD have been previously saved to the WAL, or the following -// ReadAll will fail. +// Open opens the WAL at the given position. // The returned WAL is ready to read and the first record will be the one after -// the given snap. The WAL cannot be appended to before reading out all of its +// the given position. The WAL cannot be appended to before reading out all of its // previous records. -func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) { - w, err := openAtIndex(lg, dirpath, snap, true) +func Open(lg *zap.Logger, dirpath string, pos Position) (*WAL, error) { + w, err := openAtPosition(lg, dirpath, pos, true) if err != nil { return nil, fmt.Errorf("openAtIndex failed: %w", err) } @@ -354,15 +356,15 @@ func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) { // OpenForRead only opens the wal files for read. // Write on a read only wal panics. -func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) { - return openAtIndex(lg, dirpath, snap, false) +func OpenForRead(lg *zap.Logger, dirpath string, pos Position) (*WAL, error) { + return openAtPosition(lg, dirpath, pos, false) } -func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) { +func openAtPosition(lg *zap.Logger, dirpath string, pos Position, write bool) (*WAL, error) { if lg == nil { lg = zap.NewNop() } - names, nameIndex, err := selectWALFiles(lg, dirpath, snap) + names, nameIndex, err := selectWALFiles(lg, dirpath, pos) if err != nil { return nil, fmt.Errorf("[openAtIndex] selectWALFiles failed: %w", err) } @@ -376,7 +378,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool w := &WAL{ lg: lg, dir: dirpath, - start: snap, + start: pos, decoder: NewDecoder(rs...), readClose: closer, locks: ls, @@ -396,15 +398,15 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool return w, nil } -func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) { +func selectWALFiles(lg *zap.Logger, dirpath string, pos Position) ([]string, int, error) { names, err := readWALNames(lg, dirpath) if err != nil { return nil, -1, fmt.Errorf("readWALNames failed: %w", err) } - nameIndex, ok := searchIndex(lg, names, snap.Index) + nameIndex, ok := searchIndex(lg, names, pos.Index) if !ok { - return nil, -1, fmt.Errorf("wal: file not found which matches the snapshot index '%d'", snap.Index) + return nil, -1, fmt.Errorf("wal: file not found which matches the index '%d'", pos.Index) } if !isValidSeq(lg, names[nameIndex:]) { @@ -453,9 +455,8 @@ func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, // If opened in write mode, it must read out all records until EOF. Or an error // will be returned. // If opened in read mode, it will try to read all records if possible. -// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. -// If loaded snap doesn't match with the expected one, it will return -// all the records and error ErrSnapshotMismatch. +// If the position matches snapshot and term doesn't match, it will +// return error ErrSnapshotMismatch. // TODO: detect not-last-snap error. // TODO: maybe loose the checking of match. // After ReadAll, the WAL will be ready for appending new records. @@ -477,7 +478,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. } decoder := w.decoder - var match bool for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) { switch rec.Type { case EntryType: @@ -526,7 +526,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. state.Reset() return nil, state, nil, ErrSnapshotMismatch } - match = true } default: @@ -565,16 +564,13 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb. } err = nil - if !match { - err = ErrSnapshotNotFound - } // close decoder, disable reading if w.readClose != nil { w.readClose() w.readClose = nil } - w.start = walpb.Snapshot{} + w.start = Position{} w.metadata = metadata @@ -658,10 +654,9 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro // It creates a new decoder to read through the records of the given WAL. // It does not conflict with any open WAL, but it is recommended not to // call this function after opening the WAL for writing. -// If it cannot read out the expected snap, it will return ErrSnapshotNotFound. -// If the loaded snap doesn't match with the expected one, it will +// If the position matches snapshot and term doesn't match, it will // return error ErrSnapshotMismatch. -func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardState, error) { +func Verify(lg *zap.Logger, walDir string, pos Position) (*raftpb.HardState, error) { var metadata []byte var err error var match bool @@ -672,7 +667,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta if lg == nil { lg = zap.NewNop() } - names, nameIndex, err := selectWALFiles(lg, walDir, snap) + names, nameIndex, err := selectWALFiles(lg, walDir, pos) if err != nil { return nil, err } @@ -710,8 +705,8 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta case SnapshotType: var loadedSnap walpb.Snapshot pbutil.MustUnmarshal(&loadedSnap, rec.Data) - if loadedSnap.Index == snap.Index { - if loadedSnap.Term != snap.Term { + if loadedSnap.Index == pos.Index { + if loadedSnap.Term != pos.Term { return nil, ErrSnapshotMismatch } match = true diff --git a/server/verify/verify.go b/server/verify/verify.go index 0dc99afc819..7ea74c37150 100644 --- a/server/verify/verify.go +++ b/server/verify/verify.go @@ -140,7 +140,7 @@ func validateWAL(cfg Config) (*walpb.Snapshot, *raftpb.HardState, error) { } snapshot := walSnaps[len(walSnaps)-1] - hardstate, err := wal2.Verify(cfg.Logger, walDir, snapshot) + hardstate, err := wal2.Verify(cfg.Logger, walDir, wal2.Position{Index: snapshot.Index, Term: snapshot.Term}) if err != nil { return nil, nil, err } diff --git a/tests/robustness/report/wal.go b/tests/robustness/report/wal.go index 077ddc8896a..17d93c34c4d 100644 --- a/tests/robustness/report/wal.go +++ b/tests/robustness/report/wal.go @@ -107,7 +107,7 @@ func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raf walDir := datadir.ToWALDir(dataDir) repaired := false for { - w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0}) + w, err := wal.OpenForRead(lg, walDir, wal.Position{Index: 0, Term: 0}) if err != nil { return state, nil, fmt.Errorf("failed to open WAL, err: %w", err) }