Skip to content

Commit 2d9baba

Browse files
committed
NRG: Decouple Raft transport layer
The Raft implementation was tightly coupled to the server's internal client and send queue for the RPC communication. This makes it difficult to test scenarios like network partitions in deterministic manner. The primary benefit of this change is improved testability. A new mockTransport is introduced for testing, which allows for simulating network partitions and for injecting behavior after a message is sent. Signed-off-by: Daniele Sciascia <[email protected]>
1 parent a11b5b9 commit 2d9baba

File tree

5 files changed

+383
-65
lines changed

5 files changed

+383
-65
lines changed

server/raft.go

Lines changed: 15 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ type raft struct {
184184
vote string // Our current vote state
185185

186186
s *Server // Reference to top-level server
187-
c *client // Internal client for subscriptions
188187
js *jetStream // JetStream, if running, to see if we are out of resources
189188

190189
hasleader atomic.Bool // Is there a group leader right now?
@@ -203,7 +202,7 @@ type raft struct {
203202
asubj string // Append entries subject
204203
areply string // Append entries responses subject
205204

206-
sq *sendq // Send queue for outbound RPC messages
205+
t raftTransport // Transport that handles Raft messaging
207206
aesub *subscription // Subscription for handleAppendEntry callbacks
208207

209208
wtv []byte // Term and vote to be written
@@ -296,6 +295,8 @@ type RaftConfig struct {
296295
// We need to protect against losing state due to the new peers starting with an empty log.
297296
// Therefore, these empty servers can't try to become leader until they at least have _some_ state.
298297
ScaleUp bool
298+
299+
Transport raftTransportFactory
299300
}
300301

301302
var (
@@ -437,6 +438,12 @@ func (s *Server) initRaftNode(accName string, cfg *RaftConfig, labels pprofLabel
437438
extSt: ps.domainExt,
438439
}
439440

441+
factory := cfg.Transport
442+
if factory == nil {
443+
factory = defaultRaftTransport
444+
}
445+
n.t = factory(s, n)
446+
440447
// Setup our internal subscriptions for proposals, votes and append entries.
441448
// If we fail to do this for some reason then this is fatal — we cannot
442449
// continue setting up or the Raft node may be partially/totally isolated.
@@ -681,33 +688,11 @@ func (n *raft) recreateInternalSubsLocked() error {
681688
// the next step...
682689
n.cancelCatchup()
683690

684-
// If we have an existing client then tear down any existing
685-
// subscriptions and close the internal client.
686-
if c := n.c; c != nil {
687-
c.mu.Lock()
688-
subs := make([]*subscription, 0, len(c.subs))
689-
for _, sub := range c.subs {
690-
subs = append(subs, sub)
691-
}
692-
c.mu.Unlock()
693-
for _, sub := range subs {
694-
n.unsubscribe(sub)
695-
}
696-
c.closeConnection(InternalClient)
697-
}
698-
699691
if n.acc != nrgAcc {
700692
n.debug("Subscribing in '%s'", nrgAcc.GetName())
701693
}
702694

703-
c := n.s.createInternalSystemClient()
704-
c.registerWithAccount(nrgAcc)
705-
if nrgAcc.sq == nil {
706-
nrgAcc.sq = n.s.newSendQ(nrgAcc)
707-
}
708-
n.c = c
709-
n.sq = nrgAcc.sq
710-
n.acc = nrgAcc
695+
n.t.Reset(nrgAcc)
711696

712697
// Recreate any internal subscriptions for voting, append
713698
// entries etc in the new account.
@@ -1968,17 +1953,12 @@ func (n *raft) newInbox() string {
19681953
// Our internal subscribe.
19691954
// Lock should be held.
19701955
func (n *raft) subscribe(subject string, cb msgHandler) (*subscription, error) {
1971-
if n.c == nil {
1972-
return nil, errNoInternalClient
1973-
}
1974-
return n.s.systemSubscribe(subject, _EMPTY_, false, n.c, cb)
1956+
return n.t.Subscribe(subject, cb)
19751957
}
19761958

19771959
// Lock should be held.
19781960
func (n *raft) unsubscribe(sub *subscription) {
1979-
if n.c != nil && sub != nil {
1980-
n.c.processUnsub(sub.sid)
1981-
}
1961+
n.t.Unsubscribe(sub)
19821962
}
19831963

19841964
// Lock should be held.
@@ -2103,19 +2083,7 @@ runner:
21032083
n.Lock()
21042084
defer n.Unlock()
21052085

2106-
if c := n.c; c != nil {
2107-
var subs []*subscription
2108-
c.mu.Lock()
2109-
for _, sub := range c.subs {
2110-
subs = append(subs, sub)
2111-
}
2112-
c.mu.Unlock()
2113-
for _, sub := range subs {
2114-
n.unsubscribe(sub)
2115-
}
2116-
c.closeConnection(InternalClient)
2117-
n.c = nil
2118-
}
2086+
n.t.Close()
21192087

21202088
// Unregistering ipQueues do not prevent them from push/pop
21212089
// just will remove them from the central monitoring map
@@ -4640,15 +4608,11 @@ func (n *raft) requestVote() {
46404608
}
46414609

46424610
func (n *raft) sendRPC(subject, reply string, msg []byte) {
4643-
if n.sq != nil {
4644-
n.sq.send(subject, reply, nil, msg)
4645-
}
4611+
n.t.Publish(subject, reply, msg)
46464612
}
46474613

46484614
func (n *raft) sendReply(subject string, msg []byte) {
4649-
if n.sq != nil {
4650-
n.sq.send(subject, _EMPTY_, nil, msg)
4651-
}
4615+
n.t.Publish(subject, _EMPTY_, msg)
46524616
}
46534617

46544618
func (n *raft) wonElection(votes int) bool {

server/raft_helpers_test.go

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -126,21 +126,26 @@ func (sg smGroup) lockFollowers() []stateMachine {
126126
// Create a raft group and place on numMembers servers at random.
127127
// Filestore based.
128128
func (c *cluster) createRaftGroup(name string, numMembers int, smf smFactory) smGroup {
129-
return c.createRaftGroupEx(name, numMembers, smf, FileStorage)
129+
return c.createRaftGroupEx(name, numMembers, smf, defaultRaftTransport, FileStorage)
130130
}
131131

132132
func (c *cluster) createMemRaftGroup(name string, numMembers int, smf smFactory) smGroup {
133-
return c.createRaftGroupEx(name, numMembers, smf, MemoryStorage)
133+
return c.createRaftGroupEx(name, numMembers, smf, defaultRaftTransport, MemoryStorage)
134134
}
135135

136-
func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, st StorageType) smGroup {
136+
func (c *cluster) createMockMemRaftGroup(name string, members int, smf smFactory) (*raftTransportHub, raftTransportFactory, smGroup) {
137+
hub, rtf := mockTransportFactory()
138+
return hub, rtf, c.createRaftGroupEx(name, members, smf, rtf, MemoryStorage)
139+
}
140+
141+
func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, rtf raftTransportFactory, st StorageType) smGroup {
137142
c.t.Helper()
138143
if numMembers > len(c.servers) {
139144
c.t.Fatalf("Members > Peers: %d vs %d", numMembers, len(c.servers))
140145
}
141146
servers := append([]*Server{}, c.servers...)
142147
rand.Shuffle(len(servers), func(i, j int) { servers[i], servers[j] = servers[j], servers[i] })
143-
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, st)
148+
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, rtf, st)
144149
}
145150

146151
func (c *cluster) createWAL(name string, st StorageType) WAL {
@@ -189,42 +194,48 @@ func (c *cluster) createStateMachine(s *Server, cfg *RaftConfig, peers []string,
189194
return sm
190195
}
191196

192-
func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
197+
func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, rtf raftTransportFactory, st StorageType) smGroup {
193198
c.t.Helper()
194199

195200
var sg smGroup
196201
peers := serverPeerNames(servers)
197202

198203
for _, s := range servers {
199204
cfg := &RaftConfig{
200-
Name: name,
201-
Store: c.t.TempDir(),
202-
Log: c.createWAL(name, st)}
205+
Name: name,
206+
Store: c.t.TempDir(),
207+
Log: c.createWAL(name, st),
208+
Transport: rtf}
203209
sg = append(sg, c.createStateMachine(s, cfg, peers, smf))
204210
}
205211
return sg
206212
}
207213

208-
func (c *cluster) addNodeEx(name string, smf smFactory, st StorageType) stateMachine {
214+
func (c *cluster) addNodeEx(name string, smf smFactory, rtf raftTransportFactory, st StorageType) stateMachine {
209215
c.t.Helper()
210216

211217
server := c.addInNewServer()
212218

213219
cfg := &RaftConfig{
214-
Name: name,
215-
Store: c.t.TempDir(),
216-
Log: c.createWAL(name, st)}
220+
Name: name,
221+
Store: c.t.TempDir(),
222+
Log: c.createWAL(name, st),
223+
Transport: rtf}
217224

218225
peers := serverPeerNames(c.servers)
219226
return c.createStateMachine(server, cfg, peers, smf)
220227
}
221228

222229
func (c *cluster) addRaftNode(name string, smf smFactory) stateMachine {
223-
return c.addNodeEx(name, smf, FileStorage)
230+
return c.addNodeEx(name, smf, defaultRaftTransport, FileStorage)
224231
}
225232

226233
func (c *cluster) addMemRaftNode(name string, smf smFactory) stateMachine {
227-
return c.addNodeEx(name, smf, MemoryStorage)
234+
return c.addNodeEx(name, smf, defaultRaftTransport, MemoryStorage)
235+
}
236+
237+
func (c *cluster) addMockMemRaftNode(name string, rtf raftTransportFactory, smf smFactory) stateMachine {
238+
return c.addNodeEx(name, smf, rtf, MemoryStorage)
228239
}
229240

230241
// Driver program for the state machine.

server/raft_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4627,3 +4627,110 @@ func TestNRGSingleNodeElection(t *testing.T) {
46274627
require_Equal(t, newLeader.node().ClusterSize(), 3)
46284628
require_False(t, newLeader.node().MembershipChangeInProgress())
46294629
}
4630+
4631+
func TestNRGPartitionedPeerRemove(t *testing.T) {
4632+
c := createJetStreamClusterExplicit(t, "R2S", 2)
4633+
defer c.shutdown()
4634+
4635+
hub, _, rg := c.createMockMemRaftGroup("MOCK", 2, newStateAdder)
4636+
defer hub.healPartitions()
4637+
4638+
leader := rg.waitOnLeader()
4639+
followers := rg.followers()
4640+
require_Equal(t, len(followers), 1)
4641+
require_Equal(t, leader.node().ClusterSize(), 2)
4642+
4643+
// Remove the follower while the leader is partitioned away
4644+
hub.partition(leader.node().ID(), 1)
4645+
leader.node().ProposeRemovePeer(followers[0].node().ID())
4646+
4647+
// Follower should can't get elected, but let's try anyway
4648+
followers[0].node().CampaignImmediately()
4649+
4650+
// Expect progress on the leader side
4651+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4652+
if leader.node().ClusterSize() != 1 {
4653+
return errors.New("node removal still in progress")
4654+
}
4655+
return nil
4656+
})
4657+
4658+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4659+
if leader.node().MembershipChangeInProgress() {
4660+
return errors.New("membership still in progress")
4661+
}
4662+
return nil
4663+
})
4664+
4665+
require_Equal(t, leader.node().ClusterSize(), 1)
4666+
require_False(t, leader.node().MembershipChangeInProgress())
4667+
4668+
// Follower has not changed
4669+
require_Equal(t, followers[0].node().State(), Follower)
4670+
require_Equal(t, followers[0].node().ClusterSize(), 2)
4671+
require_False(t, followers[0].node().MembershipChangeInProgress())
4672+
4673+
// Heal the partition, and expect the follower to get the bad news...
4674+
hub.heal(leader.node().ID())
4675+
4676+
checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
4677+
if followers[0].node().ClusterSize() != 1 {
4678+
return errors.New("node removal still in progress")
4679+
}
4680+
return nil
4681+
})
4682+
}
4683+
4684+
func TestNRGLeaderWithoutQuorumAfterPeerAdd(t *testing.T) {
4685+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4686+
defer c.shutdown()
4687+
4688+
hub, rtf, rg := c.createMockMemRaftGroup("MOCK", 3, newStateAdder)
4689+
defer hub.healPartitions()
4690+
4691+
leader := rg.waitOnLeader()
4692+
followers := rg.followers()
4693+
4694+
// Setup a after message hook to create a partition as soon as
4695+
// the leader publishes a EntryAddPeer. The partition will
4696+
// prevent committing the entry.
4697+
hub.setAfterMsgHook(func(subject, reply string, msg []byte) {
4698+
if subject != "$NRG.AE.MOCK" {
4699+
return
4700+
}
4701+
ae, _ := decodeAppendEntry(msg, nil, reply)
4702+
if ae == nil || len(ae.entries) != 1 {
4703+
return
4704+
}
4705+
if ae.leader != leader.node().ID() {
4706+
return
4707+
}
4708+
require_Equal(t, ae.entries[0].Type, EntryAddPeer)
4709+
4710+
// After EntryAddPeer is published, partition the
4711+
// leader and one of the followers. This partition
4712+
// can't commit the entry.
4713+
hub.partition(leader.node().ID(), 1)
4714+
hub.partition(followers[0].node().ID(), 1)
4715+
})
4716+
4717+
newNode := c.addMockMemRaftNode("MOCK", rtf, newStateAdder)
4718+
4719+
// At some point here the cluster gets partitioned:
4720+
// {leader, followers[0]} and {newNode, followers[1]}
4721+
// neither side should be able to make progress.
4722+
newGroup := smGroup{newNode, followers[1]}
4723+
newLeader := newGroup.waitOnLeader()
4724+
4725+
// If the bug is present: we managed to elect a new leader,
4726+
// in a 4 node cluster, with only two nodes in the partition!
4727+
// This is because of the following sequence of events:
4728+
// 1) the follower has received the EntryPeerAdd
4729+
// 2) the leader and the other follower have partitioned away
4730+
// 3) the entry is uncommitted, however the follower has added
4731+
/// the new peer to its peer set, but won't adjust cluster
4732+
// size and quorum until after the entry is committed.
4733+
// 4) follower becomes a canditate and will become leader with
4734+
// with a single vote from the new node
4735+
require_Equal(t, newLeader, nil)
4736+
}

0 commit comments

Comments
 (0)