Skip to content

Commit b2c2b0a

Browse files
NRG: WAL must align with snapshot
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 05e659c commit b2c2b0a

File tree

2 files changed

+163
-6
lines changed

2 files changed

+163
-6
lines changed

server/raft.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -495,12 +495,6 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
495495

496496
if state.Msgs > 0 {
497497
n.debug("Replaying state of %d entries", state.Msgs)
498-
if first, err := n.loadFirstEntry(); err == nil {
499-
n.pterm, n.pindex = first.pterm, first.pindex
500-
if first.commit > 0 && first.commit > n.commit {
501-
n.commit = first.commit
502-
}
503-
}
504498

505499
// This process will queue up entries on our applied queue but prior to the upper
506500
// state machine running. So we will monitor how much we have queued and if we
@@ -511,6 +505,25 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
511505
// yet. Replay them.
512506
for index, qsz := state.FirstSeq, 0; index <= state.LastSeq; index++ {
513507
ae, err := n.loadEntry(index)
508+
// The first entry in our WAL initializes state but must align with our snapshot if we had one.
509+
// Importantly, check this first, as we might need to truncate the WAL further than the index.
510+
if index == state.FirstSeq {
511+
// If the entry is missing, corrupt, or doesn't align with the snapshot, truncate the WAL.
512+
if err != nil || ae == nil || ae.pindex != index-1 || n.pindex != ae.pindex {
513+
if err != nil {
514+
n.warn("Could not load %d from WAL [%+v]: %v", index, state, err)
515+
} else {
516+
n.warn("Misaligned WAL, will truncate")
517+
}
518+
// Truncate to the snapshot or beginning if there is none.
519+
truncateAndErr(n.pindex)
520+
break
521+
}
522+
n.pterm, n.pindex = ae.pterm, ae.pindex
523+
if ae.commit > 0 && ae.commit > n.commit {
524+
n.commit = ae.commit
525+
}
526+
}
514527
if err != nil {
515528
n.warn("Could not load %d from WAL [%+v]: %v", index, state, err)
516529
// Truncate to the previous correct entry.

server/raft_test.go

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3911,6 +3911,150 @@ func TestNRGNoLogResetOnCorruptedSendToFollower(t *testing.T) {
39113911
require_NotEqual(t, ss.LastSeq, 0)
39123912
}
39133913

3914+
func TestNRGTruncateLogWithMisalignedSnapshotGap(t *testing.T) {
3915+
c := createJetStreamClusterExplicit(t, "R3S", 3)
3916+
s := c.servers[0] // RunBasicJetStreamServer not available
3917+
defer c.shutdown()
3918+
3919+
storeDir := t.TempDir()
3920+
fcfg := FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, srv: s}
3921+
scfg := StreamConfig{Name: "RAFT", Storage: FileStorage}
3922+
fs, err := newFileStore(fcfg, scfg)
3923+
require_NoError(t, err)
3924+
3925+
cfg := &RaftConfig{Name: "TEST", Store: storeDir, Log: fs}
3926+
3927+
err = s.bootstrapRaftNode(cfg, nil, false)
3928+
require_NoError(t, err)
3929+
n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{})
3930+
require_NoError(t, err)
3931+
3932+
// Create a sample entry, the content doesn't matter, just that it's stored.
3933+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
3934+
entries := []*Entry{newEntry(EntryNormal, esm)}
3935+
3936+
nats0 := "S1Nunr6R" // "nats-0"
3937+
3938+
// Timeline
3939+
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
3940+
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries})
3941+
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries})
3942+
3943+
for i, ae := range []*appendEntry{aeMsg1, aeMsg2, aeMsg3} {
3944+
n.processAppendEntry(ae, n.aesub)
3945+
require_Equal(t, n.pindex, uint64(i+1))
3946+
}
3947+
3948+
// Manually call back down to applied, and then snapshot.
3949+
n.Applied(1)
3950+
require_NoError(t, n.InstallSnapshot(nil))
3951+
3952+
state := n.wal.State()
3953+
require_Equal(t, state.FirstSeq, 2)
3954+
require_Equal(t, state.LastSeq, 3)
3955+
3956+
// Manually compact the WAL further so it doesn't align anymore with the snapshot.
3957+
_, err = n.wal.Compact(3)
3958+
require_NoError(t, err)
3959+
state = n.wal.State()
3960+
require_Equal(t, state.FirstSeq, 3)
3961+
require_Equal(t, state.LastSeq, 3)
3962+
3963+
// Stop current node and restart it.
3964+
n.Stop()
3965+
n.WaitForStop()
3966+
// Restart.
3967+
n.Stop()
3968+
n.WaitForStop()
3969+
require_NoError(t, fs.Stop())
3970+
fs, err = newFileStore(fcfg, scfg)
3971+
require_NoError(t, err)
3972+
cfg = &RaftConfig{Name: "TEST", Store: storeDir, Log: fs}
3973+
n, err = s.initRaftNode(globalAccountName, cfg, pprofLabels{})
3974+
require_NoError(t, err)
3975+
3976+
// The gap between the snapshot and the WAL should be detected, and the WAL should truncate.
3977+
// Can't continue normally with missing entries.
3978+
state = n.wal.State()
3979+
require_Equal(t, state.Msgs, 0)
3980+
require_Equal(t, state.FirstSeq, 2)
3981+
require_Equal(t, state.LastSeq, 1)
3982+
require_Equal(t, n.pindex, 1)
3983+
3984+
// Should be able to re-populate the missing messages.
3985+
// Need to re-encode them, though, since they would have been returned to the pool before.
3986+
aeMsg2 = encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries})
3987+
aeMsg3 = encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries})
3988+
for i, ae := range []*appendEntry{aeMsg2, aeMsg3} {
3989+
n.processAppendEntry(ae, n.aesub)
3990+
require_Equal(t, n.pindex, uint64(i+2))
3991+
}
3992+
}
3993+
3994+
func TestNRGTruncateLogWithMisingSnapshot(t *testing.T) {
3995+
c := createJetStreamClusterExplicit(t, "R3S", 3)
3996+
s := c.servers[0] // RunBasicJetStreamServer not available
3997+
defer c.shutdown()
3998+
3999+
storeDir := t.TempDir()
4000+
fcfg := FileStoreConfig{StoreDir: storeDir, BlockSize: defaultMediumBlockSize, AsyncFlush: false, srv: s}
4001+
scfg := StreamConfig{Name: "RAFT", Storage: FileStorage}
4002+
fs, err := newFileStore(fcfg, scfg)
4003+
require_NoError(t, err)
4004+
4005+
cfg := &RaftConfig{Name: "TEST", Store: storeDir, Log: fs}
4006+
4007+
err = s.bootstrapRaftNode(cfg, nil, false)
4008+
require_NoError(t, err)
4009+
n, err := s.initRaftNode(globalAccountName, cfg, pprofLabels{})
4010+
require_NoError(t, err)
4011+
4012+
// Create a sample entry, the content doesn't matter, just that it's stored.
4013+
esm := encodeStreamMsgAllowCompress("foo", "_INBOX.foo", nil, nil, 0, 0, true)
4014+
entries := []*Entry{newEntry(EntryNormal, esm)}
4015+
4016+
nats0 := "S1Nunr6R" // "nats-0"
4017+
4018+
// Timeline
4019+
aeMsg1 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 0, pindex: 0, entries: entries})
4020+
aeMsg2 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 0, pterm: 1, pindex: 1, entries: entries})
4021+
aeMsg3 := encode(t, &appendEntry{leader: nats0, term: 1, commit: 2, pterm: 1, pindex: 2, entries: entries})
4022+
4023+
for i, ae := range []*appendEntry{aeMsg1, aeMsg2, aeMsg3} {
4024+
n.processAppendEntry(ae, n.aesub)
4025+
require_Equal(t, n.pindex, uint64(i+1))
4026+
}
4027+
4028+
// Manually simulate a snapshot being made, only compacting but not actually installing a snapshot.
4029+
_, err = n.wal.Compact(2)
4030+
require_NoError(t, err)
4031+
4032+
state := n.wal.State()
4033+
require_Equal(t, state.FirstSeq, 2)
4034+
require_Equal(t, state.LastSeq, 3)
4035+
4036+
// Stop current node and restart it.
4037+
n.Stop()
4038+
n.WaitForStop()
4039+
// Restart.
4040+
n.Stop()
4041+
n.WaitForStop()
4042+
require_NoError(t, fs.Stop())
4043+
fs, err = newFileStore(fcfg, scfg)
4044+
require_NoError(t, err)
4045+
cfg = &RaftConfig{Name: "TEST", Store: storeDir, Log: fs}
4046+
n, err = s.initRaftNode(globalAccountName, cfg, pprofLabels{})
4047+
require_NoError(t, err)
4048+
4049+
// The gap between the snapshot and the WAL should be detected, and the WAL should truncate.
4050+
// Can't continue normally with missing entries.
4051+
state = n.wal.State()
4052+
require_Equal(t, state.Msgs, 0)
4053+
require_Equal(t, state.FirstSeq, 0)
4054+
require_Equal(t, state.LastSeq, 0)
4055+
require_Equal(t, n.pindex, 0)
4056+
}
4057+
39144058
// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
39154059
// proposing the next one.
39164060
// The test may fail if:

0 commit comments

Comments
 (0)