@@ -10062,3 +10062,145 @@ func TestJetStreamClusterProposeFailureDoesNotDriftClseq(t *testing.T) {
1006210062 }
1006310063}
1006410064
10065+ // TestJetStreamClusterClseqStableAcrossRealElectionRoundTrip exercises the
10066+ // fix from a real-election angle rather than the synthetic state-swap used
10067+ // in TestJetStreamClusterProposeFailureDoesNotDriftClseq. It repeatedly
10068+ // cycles leadership via StepDown so that the stream leader actually moves
10069+ // between servers, interleaves inbound publishes (including failed Propose
10070+ // attempts during the race window) with the transitions, and then verifies:
10071+ //
10072+ // - All replicas agree on lseq / FirstSeq / LastSeq via checkState.
10073+ // - No replica reports errLastSeqMismatch (which would trigger
10074+ // resetClusteredState and can destroy the raft state).
10075+ // - No replica's Raft node ends up IsDeleted.
10076+ // - Publishing resumes correctly after every leader change, with the
10077+ // expected final message count.
10078+ //
10079+ // Regression test for nats-io/nats-server#8057.
10080+ func TestJetStreamClusterClseqStableAcrossRealElectionRoundTrip (t * testing.T ) {
10081+ c := createJetStreamClusterExplicit (t , "R3S" , 3 )
10082+ defer c .shutdown ()
10083+
10084+ nc , js := jsClientConnect (t , c .randomServer ())
10085+ defer nc .Close ()
10086+
10087+ _ , err := jsStreamCreate (t , nc , & StreamConfig {
10088+ Name : "TEST" ,
10089+ Subjects : []string {"foo" },
10090+ Replicas : 3 ,
10091+ Storage : FileStorage ,
10092+ AllowAtomicPublish : true ,
10093+ })
10094+ require_NoError (t , err )
10095+
10096+ // Baseline: 5 messages published cleanly.
10097+ for range 5 {
10098+ _ , err = js .Publish ("foo" , nil )
10099+ require_NoError (t , err )
10100+ }
10101+ checkFor (t , 5 * time .Second , 200 * time .Millisecond , func () error {
10102+ return checkState (t , c , globalAccountName , "TEST" )
10103+ })
10104+
10105+ // Round-trip through three elections. On each round we:
10106+ // 1. Force an errNotLeader on the current leader (reproducing the
10107+ // original race where Propose fails while the upper layer still
10108+ // thinks we are the leader).
10109+ // 2. StepDown to trigger an actual election.
10110+ // 3. Wait for a new leader and publish more messages through it.
10111+ expectedMsgs := uint64 (5 )
10112+ seenLeaders := map [string ]bool {}
10113+ for round := 0 ; round < 3 ; round ++ {
10114+ sl := c .streamLeader (globalAccountName , "TEST" )
10115+ require_NotNil (t , sl )
10116+ seenLeaders [sl .Name ()] = true
10117+
10118+ mset , err := sl .globalAccount ().lookupStream ("TEST" )
10119+ require_NoError (t , err )
10120+ rn := mset .raftNode ().(* raft )
10121+
10122+ // Force a Propose failure while still "leader" from the upper
10123+ // layer's perspective. clseq must not advance on failure.
10124+ mset .clMu .Lock ()
10125+ before := mset .clseq
10126+ mset .clMu .Unlock ()
10127+
10128+ prev := rn .state .Swap (int32 (Follower ))
10129+ err = mset .processClusteredInboundMsg ("foo" , _EMPTY_ , nil , nil , nil , false )
10130+ rn .state .Store (prev )
10131+ require_Error (t , err , errNotLeader )
10132+
10133+ mset .clMu .Lock ()
10134+ after := mset .clseq
10135+ mset .clMu .Unlock ()
10136+ require_Equal (t , after , before )
10137+
10138+ // Trigger a real election.
10139+ require_NoError (t , rn .StepDown ())
10140+ c .waitOnStreamLeader (globalAccountName , "TEST" )
10141+
10142+ newSL := c .streamLeader (globalAccountName , "TEST" )
10143+ require_NotNil (t , newSL )
10144+
10145+ // Publish a few messages through the new leader. This validates
10146+ // that recalculateClusteredSeq repopulates clseq correctly (since
10147+ // processStreamLeaderChange now clears it on every transition)
10148+ // and that the next proposal uses the right sequence.
10149+ for range 5 {
10150+ _ , err = js .Publish ("foo" , nil )
10151+ require_NoError (t , err )
10152+ expectedMsgs ++
10153+ }
10154+
10155+ // After each round, all replicas must agree. If clseq had drifted,
10156+ // followers would hit errLastSeqMismatch and this check would fail.
10157+ checkFor (t , 5 * time .Second , 200 * time .Millisecond , func () error {
10158+ state , err := checkStateAndErr (t , c , globalAccountName , "TEST" )
10159+ if err != nil {
10160+ return err
10161+ }
10162+ if state .Msgs != expectedMsgs {
10163+ return fmt .Errorf ("expected %d messages, got %d" , expectedMsgs , state .Msgs )
10164+ }
10165+ return nil
10166+ })
10167+
10168+ // Also exercise the ProposeMulti path across the transition.
10169+ _ , err = js .PublishMsg (& nats.Msg {
10170+ Subject : "foo" ,
10171+ Header : nats.Header {
10172+ "Nats-Batch-Id" : []string {fmt .Sprintf ("batch-%d" , round )},
10173+ "Nats-Batch-Sequence" : []string {"1" },
10174+ "Nats-Batch-Commit" : []string {"1" },
10175+ },
10176+ })
10177+ require_NoError (t , err )
10178+ expectedMsgs ++
10179+
10180+ checkFor (t , 5 * time .Second , 200 * time .Millisecond , func () error {
10181+ state , err := checkStateAndErr (t , c , globalAccountName , "TEST" )
10182+ if err != nil {
10183+ return err
10184+ }
10185+ if state .Msgs != expectedMsgs {
10186+ return fmt .Errorf ("expected %d messages, got %d" , expectedMsgs , state .Msgs )
10187+ }
10188+ return nil
10189+ })
10190+ }
10191+
10192+ // Leadership should have moved across nodes at least once.
10193+ require_True (t , len (seenLeaders ) >= 2 )
10194+
10195+ // Final invariants: no replica's Raft node is marked deleted. If the
10196+ // buggy clseq drift had caused a mismatch, resetClusteredState would
10197+ // have been invoked and the underlying raft state could have been
10198+ // torn down.
10199+ for _ , s := range c .servers {
10200+ mset , err := s .globalAccount ().lookupStream ("TEST" )
10201+ require_NoError (t , err )
10202+ rn := mset .raftNode ().(* raft )
10203+ require_NotNil (t , rn )
10204+ require_False (t , rn .IsDeleted ())
10205+ }
10206+ }
0 commit comments