Skip to content

Allow opening WAL on any index #19693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *migrateConfig) finalize() error {
if err != nil {
return fmt.Errorf("failed to get the lastest snapshot: %w", err)
}
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
w, err := wal.OpenForRead(c.lg, walPath, wal.Position{Index: walSnap.Index, Term: walSnap.Term})
if err != nil {
return fmt.Errorf(`failed to open wal: %w`, err)
}
Expand Down
3 changes: 1 addition & 2 deletions server/etcdserver/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,15 +588,14 @@ func bootstrapWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot

// openWALFromSnapshot reads the WAL at the given snap and returns the wal, its latest HardState and cluster ID, and all entries that appear
// after the position of the given snap in the WAL.
// The snap must have been previously saved to the WAL, or this call will panic.
func openWALFromSnapshot(cfg config.ServerConfig, snapshot *raftpb.Snapshot) (*wal.WAL, *raftpb.HardState, []raftpb.Entry, *raftpb.Snapshot, *snapshotMetadata) {
var walsnap walpb.Snapshot
if snapshot != nil {
walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
}
repaired := false
for {
w, err := wal.Open(cfg.Logger, cfg.WALDir(), walsnap)
w, err := wal.Open(cfg.Logger, cfg.WALDir(), wal.Position{Index: snapshot.Metadata.Index, Term: snapshot.Metadata.Term})
if err != nil {
cfg.Logger.Fatal("failed to open WAL", zap.Error(err))
}
Expand Down
9 changes: 4 additions & 5 deletions server/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,18 +110,17 @@ func (st *storage) Sync() error {
func (st *storage) MinimalEtcdVersion() *semver.Version {
st.mux.Lock()
defer st.mux.Unlock()
walsnap := walpb.Snapshot{}
pos := wal.Position{}

sn, err := st.s.Load()
if err != nil && !errors.Is(err, snap.ErrNoSnapshot) {
panic(err)
}
if sn != nil {
walsnap.Index = sn.Metadata.Index
walsnap.Term = sn.Metadata.Term
walsnap.ConfState = &sn.Metadata.ConfState
pos.Index = sn.Metadata.Index
pos.Term = sn.Metadata.Term
}
w, err := st.w.Reopen(st.lg, walsnap)
w, err := st.w.Reopen(st.lg, pos)
if err != nil {
panic(err)
}
Expand Down
61 changes: 28 additions & 33 deletions server/storage/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type WAL struct {
metadata []byte // metadata recorded at the head of each WAL
state raftpb.HardState // hardstate recorded at the head of WAL

start walpb.Snapshot // snapshot to start reading
start Position // Position to start reading
decoder Decoder // decoder to Decode records
readClose func() error // closer for Decode reader

Expand All @@ -94,6 +94,10 @@ type WAL struct {
fp *filePipeline
}

type Position struct {
Term, Index uint64
}

// Create creates a WAL ready for appending records. The given metadata is
// recorded at the head of each WAL file, and can be retrieved with ReadAll
// after the file is Open.
Expand Down Expand Up @@ -258,12 +262,12 @@ func createNewWALFile[T *os.File | *fileutil.LockedFile](path string, forceNew b
return any(file).(T), nil
}

func (w *WAL) Reopen(lg *zap.Logger, snap walpb.Snapshot) (*WAL, error) {
func (w *WAL) Reopen(lg *zap.Logger, pos Position) (*WAL, error) {
err := w.Close()
if err != nil {
lg.Panic("failed to close WAL during reopen", zap.Error(err))
}
return Open(lg, w.dir, snap)
return Open(lg, w.dir, pos)
}

func (w *WAL) SetUnsafeNoFsync() {
Expand Down Expand Up @@ -324,7 +328,7 @@ func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
}

// reopen and relock
newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
newWAL, oerr := Open(w.lg, w.dir, Position{Index: 0, Term: 0})
if oerr != nil {
return nil, oerr
}
Expand All @@ -335,14 +339,12 @@ func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
return newWAL, nil
}

// Open opens the WAL at the given snap.
// The snap SHOULD have been previously saved to the WAL, or the following
// ReadAll will fail.
// Open opens the WAL at the given position.
// The returned WAL is ready to read and the first record will be the one after
// the given snap. The WAL cannot be appended to before reading out all of its
// the given position. The WAL cannot be appended to before reading out all of its
// previous records.
func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
w, err := openAtIndex(lg, dirpath, snap, true)
func Open(lg *zap.Logger, dirpath string, pos Position) (*WAL, error) {
w, err := openAtPosition(lg, dirpath, pos, true)
if err != nil {
return nil, fmt.Errorf("openAtIndex failed: %w", err)
}
Expand All @@ -354,15 +356,15 @@ func Open(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {

// OpenForRead only opens the wal files for read.
// Write on a read only wal panics.
func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, error) {
return openAtIndex(lg, dirpath, snap, false)
func OpenForRead(lg *zap.Logger, dirpath string, pos Position) (*WAL, error) {
return openAtPosition(lg, dirpath, pos, false)
}

func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
func openAtPosition(lg *zap.Logger, dirpath string, pos Position, write bool) (*WAL, error) {
if lg == nil {
lg = zap.NewNop()
}
names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
names, nameIndex, err := selectWALFiles(lg, dirpath, pos)
if err != nil {
return nil, fmt.Errorf("[openAtIndex] selectWALFiles failed: %w", err)
}
Expand All @@ -376,7 +378,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
w := &WAL{
lg: lg,
dir: dirpath,
start: snap,
start: pos,
decoder: NewDecoder(rs...),
readClose: closer,
locks: ls,
Expand All @@ -396,15 +398,15 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
return w, nil
}

func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
func selectWALFiles(lg *zap.Logger, dirpath string, pos Position) ([]string, int, error) {
names, err := readWALNames(lg, dirpath)
if err != nil {
return nil, -1, fmt.Errorf("readWALNames failed: %w", err)
}

nameIndex, ok := searchIndex(lg, names, snap.Index)
nameIndex, ok := searchIndex(lg, names, pos.Index)
if !ok {
return nil, -1, fmt.Errorf("wal: file not found which matches the snapshot index '%d'", snap.Index)
return nil, -1, fmt.Errorf("wal: file not found which matches the index '%d'", pos.Index)
}

if !isValidSeq(lg, names[nameIndex:]) {
Expand Down Expand Up @@ -453,9 +455,8 @@ func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int,
// If opened in write mode, it must read out all records until EOF. Or an error
// will be returned.
// If opened in read mode, it will try to read all records if possible.
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If loaded snap doesn't match with the expected one, it will return
// all the records and error ErrSnapshotMismatch.
// If the position matches snapshot and term doesn't match, it will
// return error ErrSnapshotMismatch.
// TODO: detect not-last-snap error.
// TODO: maybe loose the checking of match.
// After ReadAll, the WAL will be ready for appending new records.
Expand All @@ -477,7 +478,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
}
decoder := w.decoder

var match bool
for err = decoder.Decode(rec); err == nil; err = decoder.Decode(rec) {
switch rec.Type {
case EntryType:
Expand Down Expand Up @@ -526,7 +526,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
state.Reset()
return nil, state, nil, ErrSnapshotMismatch
}
match = true
}

default:
Expand Down Expand Up @@ -565,16 +564,13 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
}

err = nil
if !match {
err = ErrSnapshotNotFound
}

// close decoder, disable reading
if w.readClose != nil {
w.readClose()
w.readClose = nil
}
w.start = walpb.Snapshot{}
w.start = Position{}

w.metadata = metadata

Expand Down Expand Up @@ -658,10 +654,9 @@ func ValidSnapshotEntries(lg *zap.Logger, walDir string) ([]walpb.Snapshot, erro
// It creates a new decoder to read through the records of the given WAL.
// It does not conflict with any open WAL, but it is recommended not to
// call this function after opening the WAL for writing.
// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
// If the loaded snap doesn't match with the expected one, it will
// If the position matches snapshot and term doesn't match, it will
// return error ErrSnapshotMismatch.
func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardState, error) {
func Verify(lg *zap.Logger, walDir string, pos Position) (*raftpb.HardState, error) {
var metadata []byte
var err error
var match bool
Expand All @@ -672,7 +667,7 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
if lg == nil {
lg = zap.NewNop()
}
names, nameIndex, err := selectWALFiles(lg, walDir, snap)
names, nameIndex, err := selectWALFiles(lg, walDir, pos)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -710,8 +705,8 @@ func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) (*raftpb.HardSta
case SnapshotType:
var loadedSnap walpb.Snapshot
pbutil.MustUnmarshal(&loadedSnap, rec.Data)
if loadedSnap.Index == snap.Index {
if loadedSnap.Term != snap.Term {
if loadedSnap.Index == pos.Index {
if loadedSnap.Term != pos.Term {
return nil, ErrSnapshotMismatch
}
match = true
Expand Down
2 changes: 1 addition & 1 deletion server/verify/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func validateWAL(cfg Config) (*walpb.Snapshot, *raftpb.HardState, error) {
}

snapshot := walSnaps[len(walSnaps)-1]
hardstate, err := wal2.Verify(cfg.Logger, walDir, snapshot)
hardstate, err := wal2.Verify(cfg.Logger, walDir, wal2.Position{Index: snapshot.Index, Term: snapshot.Term})
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion tests/robustness/report/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func ReadWAL(lg *zap.Logger, dataDir string) (state raftpb.HardState, ents []raf
walDir := datadir.ToWALDir(dataDir)
repaired := false
for {
w, err := wal.OpenForRead(lg, walDir, walpb.Snapshot{Index: 0})
w, err := wal.OpenForRead(lg, walDir, wal.Position{Index: 0, Term: 0})
if err != nil {
return state, nil, fmt.Errorf("failed to open WAL, err: %w", err)
}
Expand Down