Skip to content

Commit cfd1531

Browse files
committed
NRG: Don't drain response or proposal queues when switching to leader
The `runAsCandidate` and `runAsFollower` goroutines both try to drain the append entry response and proposal queues if they are still populated from the previous leadership, but we might be blocked in `runAsCandidate` or `runAsFollower` while switching to leader waiting for the state change to complete and accidentally drain incoming proposals. Signed-off-by: Neil Twigg <[email protected]>
1 parent 1723f25 commit cfd1531

File tree

2 files changed

+37
-4
lines changed

2 files changed

+37
-4
lines changed

server/ipqueue.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,21 @@ func (q *ipQueue[T]) recycle(elts *[]T) {
230230
q.pool.Put(elts)
231231
}
232232

233+
// Renotify will re-fill the notification channel if there are pending
234+
// reads. Useful if you have read the notification channel but do not
235+
// intend to pop/popOne/drain etc.
236+
func (q *ipQueue[T]) renotify() {
237+
if q == nil {
238+
return
239+
}
240+
q.Lock()
241+
defer q.Unlock()
242+
if len(q.elts)-q.pos == 0 {
243+
return
244+
}
245+
q.ch <- struct{}{}
246+
}
247+
233248
// Returns the current length of the queue.
234249
func (q *ipQueue[T]) len() int {
235250
q.Lock()

server/raft.go

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2261,10 +2261,10 @@ func (n *raft) runAsFollower() {
22612261
n.votes.popOne()
22622262
case <-n.resp.ch:
22632263
// Ignore append entry responses received from before the state change.
2264-
n.resp.drain()
2264+
n.drainOrRenotify(n.resp)
22652265
case <-n.prop.ch:
22662266
// Ignore proposals received from before the state change.
2267-
n.prop.drain()
2267+
n.drainOrRenotify(n.prop)
22682268
case <-n.reqs.ch:
22692269
// We've just received a vote request from the network.
22702270
// Because of drain() it is possible that we get nil from popOne().
@@ -2275,6 +2275,24 @@ func (n *raft) runAsFollower() {
22752275
}
22762276
}
22772277

2278+
// For runAsFollower and runAsCandidate, to ensure that we don't hang onto
2279+
// things like append entry responses or proposals from a previous leadership
2280+
// if we are staying in this state, but that we don't nuke them if we're about
2281+
// to become a leader.
2282+
func (n *raft) drainOrRenotify(ipq interface {
2283+
drain() int
2284+
renotify()
2285+
}) {
2286+
if n.State() == Leader {
2287+
// We're in the process of switching to leader but haven't switched
2288+
// to runAsLeader yet, put the notification back so runAsLeader will
2289+
// fetch it on the next select{}.
2290+
ipq.renotify()
2291+
} else {
2292+
ipq.drain()
2293+
}
2294+
}
2295+
22782296
// Pool for CommittedEntry re-use.
22792297
var cePool = sync.Pool{
22802298
New: func() any {
@@ -3412,10 +3430,10 @@ func (n *raft) runAsCandidate() {
34123430
n.processAppendEntries()
34133431
case <-n.resp.ch:
34143432
// Ignore append entry responses received from before the state change.
3415-
n.resp.drain()
3433+
n.drainOrRenotify(n.resp)
34163434
case <-n.prop.ch:
34173435
// Ignore proposals received from before the state change.
3418-
n.prop.drain()
3436+
n.drainOrRenotify(n.prop)
34193437
case <-n.s.quitCh:
34203438
return
34213439
case <-n.quit:

0 commit comments

Comments
 (0)