diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index 1444a76892..965aa66511 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -3185,7 +3185,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps // If the error signals we timed out of a snapshot, we should try to replay the snapshot // instead of fully resetting the state. Resetting the clustered state may result in // race conditions and should only be used as a last effort attempt. - if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries { + if errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries || err == errAlreadyLeader { if node := mset.raftNode(); node != nil && node.DrainAndReplaySnapshot() { break } @@ -3997,10 +3997,8 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco } } - if isRecovering || !mset.IsLeader() { - if err := mset.processSnapshot(ss, ce.Index); err != nil && err != errAlreadyLeader { - return 0, err - } + if err := mset.processSnapshot(ss, ce.Index); err != nil { + return 0, err } } else if e.Type == EntryRemovePeer { js.mu.RLock() @@ -9925,7 +9923,14 @@ func (mset *stream) processSnapshot(snap *StreamReplicatedState, index uint64) ( // Pause the apply channel for our raft group while we catch up. if err := n.PauseApply(); err != nil { - return err + // The only reason PauseApply can fail is due to errAlreadyLeader. + // We step down to get someone else to become the leader that can catch us up. + // Ignore the error since we could have already stepped down before us doing so here. + _ = n.StepDown() + // Now try pausing again and continue to catchup. + if err = n.PauseApply(); err != nil { + return err + } } // Set our catchup state. diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 07c05949e2..f27c164de5 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -8949,3 +8949,43 @@ func TestJetStreamClusterInterestStreamMsgWithNoInterestStillAppliesRollup(t *te t.Run(fmt.Sprintf("R%d", replicas), func(t *testing.T) { test(t, replicas) }) } } + +func TestJetStreamClusterStreamLeaderStepsDownIfSnapshotCatchupRequired(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + Replicas: 3, + }) + require_NoError(t, err) + + // Publish a message and ensure everyone is synced up. + _, err = js.Publish("foo", nil) + require_NoError(t, err) + checkFor(t, 2*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) + + // Get the current stream leader. + sl := c.streamLeader(globalAccountName, "TEST") + require_NotNil(t, sl) + mset, err := sl.globalAccount().lookupStream("TEST") + require_NoError(t, err) + rn := mset.raftNode() + + // Grab the current state of the leader which contains the message we've published. + snap := mset.stateSnapshot() + // Truncate this leader's store to be empty, while remaining Raft leader. + require_NoError(t, mset.store.Truncate(0)) + // Send the snapshot containing the message. Even though we're Raft leader, we must + // still check we're up-to-date and if not: step down and catch up. + require_NoError(t, rn.SendSnapshot(snap)) + checkFor(t, 10*time.Second, 200*time.Millisecond, func() error { + return checkState(t, c, globalAccountName, "TEST") + }) +} diff --git a/server/raft.go b/server/raft.go index f968c91847..cd1613d281 100644 --- a/server/raft.go +++ b/server/raft.go @@ -1133,8 +1133,8 @@ func (n *raft) PauseApply() error { } func (n *raft) pauseApplyLocked() { - // If we are currently a candidate make sure we step down. - if n.State() == Candidate { + // If we are currently not a follower, make sure we step down. + if n.State() != Follower { n.stepdownLocked(noLeader) } diff --git a/server/store.go b/server/store.go index a46d5deebe..d88efe74ca 100644 --- a/server/store.go +++ b/server/store.go @@ -736,7 +736,7 @@ func isOutOfSpaceErr(err error) bool { var errFirstSequenceMismatch = errors.New("first sequence mismatch") func isClusterResetErr(err error) bool { - return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries + return err == errLastSeqMismatch || err == ErrStoreEOF || err == errFirstSequenceMismatch || errors.Is(err, errCatchupAbortedNoLeader) || err == errCatchupTooManyRetries || err == errAlreadyLeader } // Copy all fields.