Skip to content

Commit e9ee2a0

Browse files
De-flake tests (#5765)
This PR tries to deflake some unit tests. Signed-off-by: Neil Twigg <[email protected]>
2 parents d471c10 + 6651c2e commit e9ee2a0

5 files changed

+17
-40
lines changed

server/jetstream_cluster_1_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -6080,6 +6080,7 @@ func TestJetStreamClusterConsumerAckSyncReporting(t *testing.T) {
60806080

60816081
// Now ack the skipped message
60826082
skipped.AckSync()
6083+
c.waitOnAllCurrent()
60836084
for _, s := range c.servers {
60846085
jsz, err := s.Jsz(opts)
60856086
require_NoError(t, err)
@@ -6090,6 +6091,7 @@ func TestJetStreamClusterConsumerAckSyncReporting(t *testing.T) {
60906091

60916092
// Now ack the last message
60926093
last.AckSync()
6094+
c.waitOnAllCurrent()
60936095
for _, s := range c.servers {
60946096
jsz, err := s.Jsz(opts)
60956097
require_NoError(t, err)

server/jetstream_cluster_2_test.go

+4-11
Original file line numberDiff line numberDiff line change
@@ -2660,22 +2660,15 @@ func TestJetStreamClusterStreamCatchupNoState(t *testing.T) {
26602660
nc.Close()
26612661
c.stopAll()
26622662
// Remove all state by truncating for the non-leader.
2663-
for _, fn := range []string{"1.blk", "1.idx", "1.fss"} {
2664-
fname := filepath.Join(config.StoreDir, "$G", "streams", "TEST", "msgs", fn)
2665-
fd, err := os.OpenFile(fname, os.O_RDWR, defaultFilePerms)
2666-
if err != nil {
2667-
continue
2668-
}
2669-
fd.Truncate(0)
2670-
fd.Close()
2671-
}
26722663
// For both make sure we have no raft snapshots.
26732664
snapDir := filepath.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "snapshots")
2674-
os.RemoveAll(snapDir)
2665+
require_NoError(t, os.RemoveAll(snapDir))
2666+
msgsDir := filepath.Join(lconfig.StoreDir, "$SYS", "_js_", gname, "msgs")
2667+
require_NoError(t, os.RemoveAll(msgsDir))
26752668
// Remove all our raft state, we do not want to hold onto our term and index which
26762669
// results in a coin toss for who becomes the leader.
26772670
raftDir := filepath.Join(config.StoreDir, "$SYS", "_js_", gname)
2678-
os.RemoveAll(raftDir)
2671+
require_NoError(t, os.RemoveAll(raftDir))
26792672

26802673
// Now restart.
26812674
c.restartAll()

server/jetstream_helpers_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -1484,6 +1484,7 @@ func (c *cluster) waitOnAccount(account string) {
14841484
func (c *cluster) waitOnClusterReady() {
14851485
c.t.Helper()
14861486
c.waitOnClusterReadyWithNumPeers(len(c.servers))
1487+
c.waitOnLeader()
14871488
}
14881489

14891490
func (c *cluster) waitOnClusterReadyWithNumPeers(numPeersExpected int) {

server/norace_test.go

+4-14
Original file line numberDiff line numberDiff line change
@@ -10620,13 +10620,8 @@ func TestNoRaceJetStreamClusterMemoryStreamLastSequenceResetAfterRestart(t *test
1062010620
s.Shutdown()
1062110621
s.WaitForShutdown()
1062210622
s = c.restartServer(s)
10623-
checkFor(t, 30*time.Second, time.Second, func() error {
10624-
hs := s.healthz(nil)
10625-
if hs.Error != _EMPTY_ {
10626-
return errors.New(hs.Error)
10627-
}
10628-
return nil
10629-
})
10623+
c.waitOnServerHealthz(s)
10624+
c.waitOnAllCurrent()
1063010625
// Make sure all streams are current after healthz returns ok.
1063110626
for i := 1; i <= numStreams; i++ {
1063210627
stream := fmt.Sprintf("TEST:%d", i)
@@ -10693,13 +10688,8 @@ func TestNoRaceJetStreamClusterMemoryWorkQueueLastSequenceResetAfterRestart(t *t
1069310688
s.Shutdown()
1069410689
s.WaitForShutdown()
1069510690
s = c.restartServer(s)
10696-
checkFor(t, 30*time.Second, time.Second, func() error {
10697-
hs := s.healthz(nil)
10698-
if hs.Error != _EMPTY_ {
10699-
return errors.New(hs.Error)
10700-
}
10701-
return nil
10702-
})
10691+
c.waitOnServerHealthz(s)
10692+
c.waitOnAllCurrent()
1070310693
// Make sure all streams are current after healthz returns ok.
1070410694
for i := 1; i <= numStreams; i++ {
1070510695
stream := fmt.Sprintf("TEST:%d", i)

server/raft_test.go

+6-15
Original file line numberDiff line numberDiff line change
@@ -360,23 +360,14 @@ func TestNRGSimpleElection(t *testing.T) {
360360
func TestNRGSwitchStateClearsQueues(t *testing.T) {
361361
c := createJetStreamClusterExplicit(t, "R3S", 3)
362362
defer c.shutdown()
363+
s := c.servers[0] // RunBasicJetStreamServer not available
363364

364-
rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
365-
rg.waitOnLeader()
366-
367-
// Ensure there are no other nodes running that could
368-
// send something into our IP queues or it may break the
369-
// below assertions.
370-
for _, n := range rg {
371-
if !n.node().Leader() {
372-
n.stop()
373-
}
365+
n := &raft{
366+
prop: newIPQueue[*Entry](s, "prop"),
367+
resp: newIPQueue[*appendEntryResponse](s, "resp"),
368+
leadc: make(chan bool, 1), // for switchState
374369
}
375-
376-
rg.lockAll()
377-
defer rg.unlockAll()
378-
379-
n := rg.leader().node().(*raft)
370+
n.state.Store(int32(Leader))
380371
require_Equal(t, n.prop.len(), 0)
381372
require_Equal(t, n.resp.len(), 0)
382373

0 commit comments

Comments
 (0)