Skip to content

Commit f878288

Browse files
committed
NRG: Periodic upper layer healthcheck
Signed-off-by: Neil Twigg <[email protected]>
1 parent 1723f25 commit f878288

File tree

4 files changed

+163
-3
lines changed

4 files changed

+163
-3
lines changed

server/jetstream_cluster.go

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,18 @@ func (js *jetStream) setupMetaGroup() error {
955955
return err
956956
}
957957

958+
// Quick healthcheck function to prove the metalayer *looks* functional.
959+
n.SetUpperLayerHealthcheck(func() error {
960+
// Check that we can acquire dios in a timely fashion.
961+
select {
962+
case <-dios:
963+
dios <- struct{}{}
964+
case <-time.After(3 * time.Second):
965+
return fmt.Errorf("extreme dios contention")
966+
}
967+
return nil
968+
})
969+
958970
// If we are bootstrapped with no state, start campaign early.
959971
if bootstrap {
960972
n.Campaign()
@@ -4354,11 +4366,15 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
43544366
mset.startClusterSubs()
43554367
mset.mu.Unlock()
43564368

4357-
js.createRaftGroup(acc.GetName(), rg, recovering, storage, pprofLabels{
4369+
// TODO(nat): Check error condition here, send SA result if err.
4370+
rn, _ := js.createRaftGroup(acc.GetName(), rg, recovering, storage, pprofLabels{
43584371
"type": "stream",
43594372
"account": mset.accName(),
43604373
"stream": mset.name(),
43614374
})
4375+
if rn != nil {
4376+
rn.SetUpperLayerHealthcheck(mset.upperLayerHealthcheck)
4377+
}
43624378
}
43634379
mset.monitorWg.Add(1)
43644380
// Start monitoring..
@@ -4545,11 +4561,14 @@ func (js *jetStream) processClusterCreateStream(acc *Account, sa *streamAssignme
45454561
s.Warnf("JetStream cluster error updating stream %q for account %q: %v", sa.Config.Name, acc.Name, err)
45464562
if osa != nil {
45474563
// Process the raft group and make sure it's running if needed.
4548-
js.createRaftGroup(acc.GetName(), osa.Group, osa.recovering, storage, pprofLabels{
4564+
rn, _ := js.createRaftGroup(acc.GetName(), osa.Group, osa.recovering, storage, pprofLabels{
45494565
"type": "stream",
45504566
"account": mset.accName(),
45514567
"stream": mset.name(),
45524568
})
4569+
if rn != nil {
4570+
rn.SetUpperLayerHealthcheck(mset.upperLayerHealthcheck)
4571+
}
45534572
mset.setStreamAssignment(osa)
45544573
}
45554574
if rg.node != nil {
@@ -5160,12 +5179,16 @@ func (js *jetStream) processClusterCreateConsumer(ca *consumerAssignment, state
51605179
storage = MemoryStorage
51615180
}
51625181
// No-op if R1.
5163-
js.createRaftGroup(accName, rg, recovering, storage, pprofLabels{
5182+
rn, _ := js.createRaftGroup(accName, rg, recovering, storage, pprofLabels{
51645183
"type": "consumer",
51655184
"account": mset.accName(),
51665185
"stream": ca.Stream,
51675186
"consumer": ca.Name,
51685187
})
5188+
if rn != nil {
5189+
// For now consider a consumer to be healthy if the parent stream is.
5190+
rn.SetUpperLayerHealthcheck(mset.upperLayerHealthcheck)
5191+
}
51695192

51705193
// Check if we already have this consumer running.
51715194
var didCreate, isConfigUpdate, needsLocalResponse bool

server/raft.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ type RaftNode interface {
8787
RecreateInternalSubs() error
8888
IsSystemAccount() bool
8989
GetTrafficAccountName() string
90+
SetUpperLayerHealthcheck(f func() error)
9091
}
9192

9293
type WAL interface {
@@ -233,6 +234,9 @@ type raft struct {
233234
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
234235
membChanging bool // There is a membership change proposal in progress
235236
deleted bool // If the node was deleted.
237+
238+
hcf func() error // Healthcheck function from the upper layer, optional
239+
hcfailed bool // Healthcheck function from the upper layer last reported failed
236240
}
237241

238242
type proposedEntry struct {
@@ -625,6 +629,15 @@ func (n *raft) GetTrafficAccountName() string {
625629
return n.acc.GetName()
626630
}
627631

632+
// If provided, periodically check when leader if the upper layer is still responding.
633+
// If it does not return that it is happy then we will step down. The healthcheck
634+
// function can block, which is fine, because during that time runAsLeader can't send HBs.
635+
func (n *raft) SetUpperLayerHealthcheck(f func() error) {
636+
n.Lock()
637+
defer n.Unlock()
638+
n.hcf = f
639+
}
640+
628641
func (n *raft) RecreateInternalSubs() error {
629642
n.Lock()
630643
defer n.Unlock()
@@ -2219,6 +2232,15 @@ func (n *raft) processAppendEntries() {
22192232
// runAsFollower is called by run and will block for as long as the node is
22202233
// running in the follower state.
22212234
func (n *raft) runAsFollower() {
2235+
// If we entered the follower state with a failed healthcheck then we
2236+
// will start a timer to keep checking it, as we might recover.
2237+
hct := make(<-chan time.Time)
2238+
if n.hcf != nil && n.hcfailed {
2239+
hcf := time.NewTicker(hbInterval * 30)
2240+
defer hcf.Stop()
2241+
hct = hcf.C
2242+
}
2243+
22222244
for n.State() == Follower {
22232245
elect := n.electTimer()
22242246

@@ -2254,6 +2276,12 @@ func (n *raft) runAsFollower() {
22542276
n.switchToCandidate()
22552277
return
22562278
}
2279+
case <-hct:
2280+
if err := n.hcf(); err == nil {
2281+
n.warn("Upper layer healthcheck failure has cleared")
2282+
n.setObserver(false, extUndetermined)
2283+
return
2284+
}
22572285
case <-n.votes.ch:
22582286
// We're receiving votes from the network, probably because we have only
22592287
// just stepped down and they were already in flight. Ignore them.
@@ -2793,6 +2821,9 @@ func (n *raft) runAsLeader() {
27932821
lq := time.NewTicker(lostQuorumCheck)
27942822
defer lq.Stop()
27952823

2824+
hcf := time.NewTicker(hbInterval * 30)
2825+
defer hcf.Stop()
2826+
27962827
for n.State() == Leader {
27972828
select {
27982829
case <-n.s.quitCh:
@@ -2850,6 +2881,18 @@ func (n *raft) runAsLeader() {
28502881
n.stepdown(noLeader)
28512882
return
28522883
}
2884+
case <-hcf.C:
2885+
if n.hcf == nil {
2886+
continue
2887+
}
2888+
if err := n.hcf(); err != nil {
2889+
n.hcfailed = true
2890+
n.warn("Upper layer healthcheck failed: %v", err)
2891+
n.warn("Stepping down from leader and switching to observer mode")
2892+
n.stepdown(noLeader)
2893+
n.setObserver(true, extUndetermined)
2894+
return
2895+
}
28532896
case <-n.votes.ch:
28542897
// Because of drain() it is possible that we get nil from popOne().
28552898
vresp, ok := n.votes.popOne()
@@ -3405,6 +3448,9 @@ func (n *raft) runAsCandidate() {
34053448
votes := map[string]struct{}{}
34063449
emptyVotes := map[string]struct{}{}
34073450

3451+
hcf := time.NewTicker(hbInterval * 30)
3452+
defer hcf.Stop()
3453+
34083454
for n.State() == Candidate {
34093455
elect := n.electTimer()
34103456
select {
@@ -3423,6 +3469,18 @@ func (n *raft) runAsCandidate() {
34233469
case <-elect.C:
34243470
n.switchToCandidate()
34253471
return
3472+
case <-hcf.C:
3473+
if n.hcf == nil {
3474+
continue
3475+
}
3476+
if err := n.hcf(); err != nil && !n.hcfailed {
3477+
n.hcfailed = true
3478+
n.warn("Upper layer healthcheck failed: %v", err)
3479+
n.warn("Stepping down from candidate and switching to observer mode")
3480+
n.stepdown(noLeader)
3481+
n.setObserver(true, extUndetermined)
3482+
return
3483+
}
34263484
case <-n.votes.ch:
34273485
// Because of drain() it is possible that we get nil from popOne().
34283486
vresp, ok := n.votes.popOne()
@@ -4759,6 +4817,17 @@ func (n *raft) switchToCandidate() {
47594817
n.Lock()
47604818
defer n.Unlock()
47614819

4820+
if n.hcf != nil {
4821+
if err := n.hcf(); err != nil {
4822+
n.debug("Not switching to candidate due to healthcheck failure:", err)
4823+
return
4824+
} else if n.hcfailed {
4825+
n.hcfailed = false
4826+
n.warn("Upper layer healthcheck failure has cleared")
4827+
n.setObserver(false, extUndetermined)
4828+
}
4829+
}
4830+
47624831
// If we are catching up or are in observer mode we can not switch.
47634832
// Avoid petitioning to become leader if we're behind on applies.
47644833
if n.observer || n.paused || n.processed < n.commit {

server/raft_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4652,3 +4652,47 @@ func TestNRGMustNotResetVoteOnStepDownOrLeaderTransfer(t *testing.T) {
46524652
require_Equal(t, n.term, 1)
46534653
require_Equal(t, n.vote, nats0)
46544654
}
4655+
4656+
func TestNRGUpperLayerHealthcheck(t *testing.T) {
4657+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4658+
defer c.shutdown()
4659+
4660+
rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
4661+
leader := rg.waitOnLeader()
4662+
rn := leader.node()
4663+
4664+
start := time.Now()
4665+
rn.SetUpperLayerHealthcheck(func() error {
4666+
// Let's have normal operations for a bit before we signal
4667+
// an error.
4668+
if time.Since(start) > time.Second {
4669+
return fmt.Errorf("time has passed")
4670+
}
4671+
return nil
4672+
})
4673+
4674+
checkFor(t, 5*time.Second, 250*time.Millisecond, func() error {
4675+
rn := leader.node()
4676+
if rn.Leader() {
4677+
return fmt.Errorf("shouldn't still be leader")
4678+
}
4679+
if !rn.IsObserver() {
4680+
return fmt.Errorf("should be observer")
4681+
}
4682+
return nil
4683+
})
4684+
4685+
rn.SetUpperLayerHealthcheck(func() error {
4686+
// Now we'll set a healthcheck that always succeeds, so
4687+
// we should recover.
4688+
return nil
4689+
})
4690+
4691+
checkFor(t, 30*time.Second, 250*time.Millisecond, func() error {
4692+
rn := leader.node()
4693+
if rn.IsObserver() {
4694+
return fmt.Errorf("should not be observer")
4695+
}
4696+
return nil
4697+
})
4698+
}

server/stream.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,6 +1075,30 @@ func (mset *stream) setStreamAssignment(sa *streamAssignment) {
10751075
}
10761076
}
10771077

1078+
func (mset *stream) upperLayerHealthcheck() error {
1079+
// Check that nothing looks to be holding the lock for a long time.
1080+
// TODO(nat): we could check mset.mu etc here, but I'm not convinced we
1081+
// have eliminated all of the expensive or linear scans that may genuinely
1082+
// hold the lock and aren't worthy of a leadership change.
1083+
start := time.Now()
1084+
mset.cfgMu.RLock()
1085+
memory := mset.cfg.Storage == MemoryStorage
1086+
mset.cfgMu.RUnlock()
1087+
if time.Since(start) > 3*time.Second {
1088+
return fmt.Errorf("extreme stream lock contention")
1089+
}
1090+
if !memory {
1091+
// Check that we can acquire dios in a timely fashion.
1092+
select {
1093+
case <-dios:
1094+
dios <- struct{}{}
1095+
case <-time.After(3 * time.Second):
1096+
return fmt.Errorf("extreme dios contention")
1097+
}
1098+
}
1099+
return nil
1100+
}
1101+
10781102
func (mset *stream) monitorQuitC() <-chan struct{} {
10791103
if mset == nil {
10801104
return nil

0 commit comments

Comments
 (0)