Skip to content

Commit 3624b16

Browse files
NRG: Can't become leader based on majority when repairing truncated log
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 9751bab commit 3624b16

File tree

2 files changed

+97
-1
lines changed

2 files changed

+97
-1
lines changed

server/raft.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3324,12 +3324,16 @@ func (n *raft) runAsCandidate() {
33243324
n.RLock()
33253325
nterm := n.term
33263326
csz := n.csz
3327+
repairing, initializing := n.repairing, n.initializing
33273328
n.RUnlock()
33283329

33293330
if vresp.granted && nterm == vresp.term {
33303331
// only track peers that would be our followers
33313332
n.trackPeer(vresp.peer)
3332-
if !vresp.empty {
3333+
3334+
// A vote only counts toward a majority if it's a non-empty vote from an intact server,
3335+
// and we're not repairing ourselves either (a more up-to-date server could exist).
3336+
if !vresp.empty && (!repairing || initializing) {
33333337
votes[vresp.peer] = struct{}{}
33343338
} else {
33353339
emptyVotes[vresp.peer] = struct{}{}

server/raft_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3654,6 +3654,8 @@ func TestNRGLostQuorum(t *testing.T) {
36543654
n, cleanup := initSingleMemRaftNode(t)
36553655
defer cleanup()
36563656

3657+
require_True(t, n.repairing)
3658+
require_True(t, n.initializing)
36573659
require_Equal(t, n.State(), Follower)
36583660
require_False(t, n.Quorum())
36593661
require_True(t, n.lostQuorum())
@@ -4127,6 +4129,96 @@ func TestNRGRepairAfterMajorityCorruption(t *testing.T) {
41274129
rg.waitOnTotal(t, 5)
41284130
}
41294131

4132+
func TestNRGRepairAfterMinorityCorruption(t *testing.T) {
4133+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4134+
defer c.shutdown()
4135+
4136+
rg := c.createRaftGroup("TEST", 3, newStateAdder)
4137+
l := rg.waitOnLeader()
4138+
rs := rg.nonLeader()
4139+
for i := range int64(5) {
4140+
l.(*stateAdder).proposeDelta(1)
4141+
// Custom rg.waitOnTotal to allow a server to be down.
4142+
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
4143+
var err error
4144+
for _, sm := range rg {
4145+
if sm.node().State() == Closed {
4146+
continue
4147+
}
4148+
asm := sm.(*stateAdder)
4149+
if total := asm.total(); total != (i + 1) {
4150+
err = errors.Join(err, fmt.Errorf("Adder on %v has wrong total: %d vs %d", asm.server(), total, i+1))
4151+
}
4152+
}
4153+
return err
4154+
})
4155+
// Stop a random member so that its log will be outdated.
4156+
if i == 3 {
4157+
rs.stop()
4158+
}
4159+
}
4160+
4161+
// Stop all servers.
4162+
for _, sm := range rg {
4163+
sm.stop()
4164+
}
4165+
4166+
// Corrupt the log on one server and then bring it back together with the outdated server.
4167+
// The corrupted server should not be able to become leader.
4168+
var cs stateMachine
4169+
for _, sm := range rg {
4170+
if sm == l || sm == rs {
4171+
continue
4172+
}
4173+
cs = sm
4174+
rn := sm.node().(*raft)
4175+
rn.RLock()
4176+
blk := filepath.Join(rn.sd, msgDir, "1.blk")
4177+
rn.RUnlock()
4178+
4179+
stat, err := os.Stat(blk)
4180+
require_NoError(t, err)
4181+
require_LessThan(t, 0, stat.Size())
4182+
require_NoError(t, os.Truncate(blk, stat.Size()-1))
4183+
4184+
// Confirm the file was truncated after restart.
4185+
sm.restart()
4186+
nstat, err := os.Stat(blk)
4187+
require_NoError(t, err)
4188+
require_NotEqual(t, nstat.Size(), stat.Size())
4189+
}
4190+
rs.restart()
4191+
expires := time.Now().Add(2 * time.Second)
4192+
for time.Now().Before(expires) {
4193+
// Speed up the leader election on the corrupted server by purposefully having it campaign immediately.
4194+
// The outdated server will vote for the corrupted server, no questions asked. But the corrupted server
4195+
// should know it requires its log to be repaired so it can't become leader based on a majority alone.
4196+
n := cs.node().(*raft)
4197+
n.Lock()
4198+
n.campaign(time.Millisecond)
4199+
n.Unlock()
4200+
4201+
time.Sleep(100 * time.Millisecond)
4202+
for _, sm := range rg {
4203+
if sm == l {
4204+
continue
4205+
}
4206+
if n := sm.node(); n.State() == Leader || n.Leader() {
4207+
t.Fatal("Leader elected from corrupted logs")
4208+
}
4209+
}
4210+
}
4211+
4212+
// Restart the leader, and we expect it to become leader again.
4213+
l.restart()
4214+
nl := rg.waitOnLeader()
4215+
require_NotNil(t, nl)
4216+
require_Equal(t, nl, l)
4217+
4218+
// The logs should have been repaired, and the total should match.
4219+
rg.waitOnTotal(t, 5)
4220+
}
4221+
41304222
// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
41314223
// proposing the next one.
41324224
// The test may fail if:

0 commit comments

Comments
 (0)