Skip to content

Commit 1e3d5d1

Browse files
committed
NRG: Disjoint majorities during membership changes and network partitions
This commit fixes the following bugs: - Inconsistent Cluster Size: When a leader was partitioned from the cluster, immediately after proposing a EntryAddPeer. The remaining nodes could end up with different view of the cluster size and quorum. So followers could have cluster size and would not match the number of peers in the peer set. A subsequent leader election, electing one of the followers, could break the quorum system. - Incorrect Leader Election: It was possible for a new leader to be elected without a proper quorum. This could happen if a partition occurred after a new peer was proposed but before that change was committed. A follower could add the uncommitted peer to its peer set but would not update its cluster size and quorum, leading to an invalid election. Both issues are solved by making sure that when a peer is added or removed from the membership, the cluster size and quorum are adjusted accordingly, at the same time. Followers would first add peers when receiving the EntryAddPeer, and then adjusting the cluster size only after commit. This patch changes this behavior such that the cluster size and quorum are recomputed upon receiving the EntryAddPeer / EntryRemovePeer proposals. This is inline with the membership protocol proposed in Ongaro's dissertation, section 4.1. This patch also removes the concept of a "known" peer from the Raft layer. A node would add a peer to its peer set when first receiving the corresponding appendEntry, and on commit it would be marked as "known". This distinction no longer applies. Signed-off-by: Daniele Sciascia <[email protected]>
1 parent 7351075 commit 1e3d5d1

File tree

4 files changed

+227
-73
lines changed

4 files changed

+227
-73
lines changed

server/monitor.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4190,8 +4190,10 @@ func (s *Server) Raftz(opts *RaftzOptions) *RaftzStatus {
41904190
}
41914191
peer := RaftzGroupPeer{
41924192
Name: s.serverNameForNode(id),
4193-
Known: p.kp,
41944193
LastReplicatedIndex: p.li,
4194+
// The Raft layer no longer distinguishes between
4195+
// 'known' and 'unknown' peers.
4196+
Known: true,
41954197
}
41964198
if !p.ts.IsZero() {
41974199
peer.LastSeen = time.Since(p.ts).String()

server/raft.go

Lines changed: 31 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,6 @@ type catchupState struct {
253253
type lps struct {
254254
ts time.Time // Last timestamp
255255
li uint64 // Last index replicated
256-
kp bool // Known peer
257256
}
258257

259258
const (
@@ -543,13 +542,13 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
543542
}
544543

545544
// Make sure to track ourselves.
546-
n.peers[n.id] = &lps{time.Now(), 0, true}
545+
n.peers[n.id] = &lps{time.Now(), 0}
547546

548547
// Track known peers
549548
for _, peer := range ps.knownPeers {
550549
if peer != n.id {
551550
// Set these to 0 to start but mark as known peer.
552-
n.peers[peer] = &lps{time.Time{}, 0, true}
551+
n.peers[peer] = &lps{time.Time{}, 0}
553552
}
554553
}
555554

@@ -2624,13 +2623,10 @@ func (n *raft) addPeer(peer string) {
26242623
delete(n.removed, peer)
26252624
}
26262625

2627-
if lp, ok := n.peers[peer]; !ok {
2626+
if _, ok := n.peers[peer]; !ok {
26282627
// We are not tracking this one automatically so we need
26292628
// to bump cluster size.
2630-
n.peers[peer] = &lps{time.Time{}, 0, true}
2631-
} else {
2632-
// Mark as added.
2633-
lp.kp = true
2629+
n.peers[peer] = &lps{time.Time{}, 0}
26342630
}
26352631

26362632
// Adjust cluster size and quorum if needed.
@@ -2917,13 +2913,7 @@ func (n *raft) runCatchup(ar *appendEntryResponse, indexUpdatesQ *ipQueue[uint64
29172913
if len(n.progress) == 0 {
29182914
n.progress = nil
29192915
}
2920-
// Check if this is a new peer and if so go ahead and propose adding them.
2921-
_, exists := n.peers[peer]
29222916
n.Unlock()
2923-
if !exists {
2924-
n.debug("Catchup done for %q, will add into peers", peer)
2925-
n.ProposeAddPeer(peer)
2926-
}
29272917
indexUpdatesQ.unregister()
29282918
}()
29292919

@@ -3188,36 +3178,16 @@ func (n *raft) applyCommit(index uint64) error {
31883178
data: e.Data,
31893179
})
31903180
}
3191-
case EntryPeerState:
3192-
if n.State() != Leader {
3193-
if ps, err := decodePeerState(e.Data); err == nil {
3194-
n.processPeerState(ps)
3195-
}
3196-
}
31973181
case EntryAddPeer:
3198-
newPeer := string(e.Data)
3199-
n.debug("Added peer %q", newPeer)
3200-
3201-
// Store our peer in our global peer map for all peers.
3202-
peers.LoadOrStore(newPeer, newPeer)
3203-
3204-
n.addPeer(newPeer)
3205-
32063182
// We pass these up as well.
32073183
committed = append(committed, e)
32083184

32093185
// We are done with this membership change
32103186
n.membChanging = false
3211-
32123187
case EntryRemovePeer:
32133188
peer := string(e.Data)
32143189
n.debug("Removing peer %q", peer)
32153190

3216-
n.removePeer(peer)
3217-
3218-
// Remove from string intern map.
3219-
peers.Delete(peer)
3220-
32213191
// We pass these up as well.
32223192
committed = append(committed, e)
32233193

@@ -3305,25 +3275,20 @@ func (n *raft) trackResponse(ar *appendEntryResponse) bool {
33053275
// Used to adjust cluster size and peer count based on added official peers.
33063276
// lock should be held.
33073277
func (n *raft) adjustClusterSizeAndQuorum() {
3308-
pcsz, ncsz := n.csz, 0
3309-
for _, peer := range n.peers {
3310-
if peer.kp {
3311-
ncsz++
3312-
}
3313-
}
3314-
n.csz = ncsz
3278+
pcsz := n.csz
3279+
n.csz = len(n.peers)
33153280
n.qn = n.csz/2 + 1
33163281

3317-
if ncsz > pcsz {
3318-
n.debug("Expanding our clustersize: %d -> %d", pcsz, ncsz)
3282+
if n.csz > pcsz {
3283+
n.debug("Expanding our clustersize: %d -> %d", pcsz, n.csz)
33193284
n.lsut = time.Now()
3320-
} else if ncsz < pcsz {
3321-
n.debug("Decreasing our clustersize: %d -> %d", pcsz, ncsz)
3285+
} else if n.csz < pcsz {
3286+
n.debug("Decreasing our clustersize: %d -> %d", pcsz, n.csz)
33223287
if n.State() == Leader {
33233288
go n.sendHeartbeat()
33243289
}
33253290
}
3326-
if ncsz != pcsz {
3291+
if n.csz != pcsz {
33273292
n.recreateInternalSubsLocked()
33283293
}
33293294
}
@@ -3341,7 +3306,7 @@ func (n *raft) trackPeer(peer string) error {
33413306
}
33423307
}
33433308
if n.State() == Leader {
3344-
if lp, ok := n.peers[peer]; !ok || !lp.kp {
3309+
if _, ok := n.peers[peer]; !ok {
33453310
// Check if this peer had been removed previously.
33463311
needPeerAdd = !isRemoved
33473312
}
@@ -3936,15 +3901,24 @@ CONTINUE:
39363901
}
39373902
}
39383903
}
3939-
case EntryAddPeer:
3940-
if newPeer := string(e.Data); len(newPeer) == idLen {
3941-
// Track directly, but wait for commit to be official
3942-
if _, ok := n.peers[newPeer]; !ok {
3943-
n.peers[newPeer] = &lps{time.Time{}, 0, false}
3904+
case EntryPeerState:
3905+
if n.State() != Leader {
3906+
if ps, err := decodePeerState(e.Data); err == nil {
3907+
n.processPeerState(ps)
39443908
}
3945-
// Store our peer in our global peer map for all peers.
3946-
peers.LoadOrStore(newPeer, newPeer)
39473909
}
3910+
case EntryAddPeer:
3911+
peer := string(e.Data)
3912+
// Store our peer in our global peer map for all peers.
3913+
peers.LoadOrStore(peer, peer)
3914+
n.addPeer(peer)
3915+
n.debug("Added peer %q", peer)
3916+
case EntryRemovePeer:
3917+
peer := string(e.Data)
3918+
// Remove from string intern map.
3919+
peers.Delete(peer)
3920+
n.removePeer(peer)
3921+
n.debug("Removed peer %q", peer)
39483922
}
39493923
}
39503924

@@ -4006,10 +3980,9 @@ func (n *raft) processPeerState(ps *peerState) {
40063980
n.peers = make(map[string]*lps)
40073981
for _, peer := range ps.knownPeers {
40083982
if lp := old[peer]; lp != nil {
4009-
lp.kp = true
40103983
n.peers[peer] = lp
40113984
} else {
4012-
n.peers[peer] = &lps{time.Time{}, 0, true}
3985+
n.peers[peer] = &lps{time.Time{}, 0}
40133986
}
40143987
}
40153988
n.debug("Update peers from leader to %+v", n.peers)
@@ -4251,10 +4224,8 @@ func decodePeerState(buf []byte) (*peerState, error) {
42514224
// Lock should be held.
42524225
func (n *raft) peerNames() []string {
42534226
var peers []string
4254-
for name, peer := range n.peers {
4255-
if peer.kp {
4256-
peers = append(peers, name)
4257-
}
4227+
for name := range n.peers {
4228+
peers = append(peers, name)
42584229
}
42594230
return peers
42604231
}

server/raft_helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (sg smGroup) leader() stateMachine {
5757
return nil
5858
}
5959

60-
func (sg smGroup) followers() []stateMachine {
60+
func (sg smGroup) followers() smGroup {
6161
var f []stateMachine
6262
for _, sm := range sg {
6363
if sm.node().Leader() {

0 commit comments

Comments
 (0)