Skip to content

Commit 4a7566f

Browse files
[FIXED] Deleted raft node's stream is revived (#7668)
Related to #7025 If the Raft node of the stream was deleted while the stream was stalled on upper-layer catchup, it would wrongfully revive the stream. We now mark the Raft node as deleted so we can ensure the stream monitor goroutine quits and doesn't get revived. Signed-off-by: Maurice van Veen <[email protected]>
2 parents a11b5b9 + d0eb0b1 commit 4a7566f

File tree

3 files changed

+88
-7
lines changed

3 files changed

+88
-7
lines changed

server/jetstream_cluster.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3142,20 +3142,20 @@ func (mset *stream) resetClusteredState(err error) bool {
31423142
stype, tierName, replicas := mset.cfg.Storage, mset.tier, mset.cfg.Replicas
31433143
mset.mu.RUnlock()
31443144

3145-
assert.Unreachable("Reset clustered state", map[string]any{
3146-
"stream": name,
3147-
"account": acc.Name,
3148-
"err": err,
3149-
})
3150-
31513145
// The stream might already be deleted and not assigned to us anymore.
31523146
// In any case, don't revive the stream if it's already closed.
3153-
if mset.closed.Load() {
3147+
if mset.closed.Load() || (node != nil && node.IsDeleted()) {
31543148
s.Warnf("Will not reset stream '%s > %s', stream is closed", acc, mset.name())
31553149
// Explicitly returning true here, we want the outside to break out of the monitoring loop as well.
31563150
return true
31573151
}
31583152

3153+
assert.Unreachable("Reset clustered state", map[string]any{
3154+
"stream": name,
3155+
"account": acc.Name,
3156+
"err": err,
3157+
})
3158+
31593159
// Stepdown regardless if we are the leader here.
31603160
if node != nil {
31613161
node.StepDown()

server/jetstream_cluster_3_test.go

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6630,3 +6630,75 @@ func TestJetStreamClusterStreamDesyncDuringSnapshot(t *testing.T) {
66306630
t.Run("Reset", func(t *testing.T) { test(t, KindReset) })
66316631
t.Run("Truncate", func(t *testing.T) { test(t, KindTruncate) })
66326632
}
6633+
6634+
func TestJetStreamClusterDeletedNodeDoesNotReviveStreamAfterCatchup(t *testing.T) {
6635+
c := createJetStreamClusterExplicit(t, "R3S", 3)
6636+
defer c.shutdown()
6637+
6638+
nc, js := jsClientConnect(t, c.randomServer())
6639+
defer nc.Close()
6640+
6641+
_, err := js.AddStream(&nats.StreamConfig{
6642+
Name: "TEST",
6643+
Subjects: []string{"foo"},
6644+
Replicas: 3,
6645+
Storage: nats.FileStorage,
6646+
})
6647+
require_NoError(t, err)
6648+
6649+
_, err = js.Publish("foo", nil)
6650+
require_NoError(t, err)
6651+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
6652+
return checkState(t, c, globalAccountName, "TEST")
6653+
})
6654+
6655+
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
6656+
for _, s := range c.servers {
6657+
if s == rs {
6658+
continue
6659+
}
6660+
s.Shutdown()
6661+
s.WaitForShutdown()
6662+
}
6663+
6664+
mset, err := rs.globalAccount().lookupStream("TEST")
6665+
require_NoError(t, err)
6666+
snap := mset.stateSnapshot()
6667+
6668+
// Reset the entire store so we can catchup based on the above snapshot.
6669+
fs := mset.store.(*fileStore)
6670+
require_NoError(t, fs.reset())
6671+
6672+
// Mark the node as leaderless, and get the upper-layer to start a catchup from a snapshot.
6673+
node := mset.raftNode()
6674+
node.(*raft).hasleader.Store(false)
6675+
node.ApplyQ().push(newCommittedEntry(10, []*Entry{{EntrySnapshot, snap}}))
6676+
6677+
// Since the node is leaderless, it will retry after some time. We wait a little here to ensure
6678+
// it's waiting there as well, and then we delete the node outright.
6679+
time.Sleep(time.Second)
6680+
node.Delete()
6681+
6682+
// The stream's goroutine should eventually be stopped. This will fail if the stream is revived.
6683+
var retries int
6684+
checkFor(t, 10*time.Second, 200*time.Millisecond, func() error {
6685+
mset, err = rs.globalAccount().lookupStream("TEST")
6686+
if err != nil {
6687+
retries = 0
6688+
return err
6689+
}
6690+
if mset.isMonitorRunning() {
6691+
retries = 0
6692+
return errors.New("monitor still running")
6693+
}
6694+
if state := mset.raftNode().State(); state != Closed {
6695+
retries = 0
6696+
return errors.New("node not closed")
6697+
}
6698+
retries++
6699+
if retries < 3 {
6700+
return errors.New("still confirming stable state")
6701+
}
6702+
return nil
6703+
})
6704+
}

server/raft.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ type RaftNode interface {
8383
Stop()
8484
WaitForStop()
8585
Delete()
86+
IsDeleted() bool
8687
RecreateInternalSubs() error
8788
IsSystemAccount() bool
8889
GetTrafficAccountName() string
@@ -231,6 +232,7 @@ type raft struct {
231232
initializing bool // The node is new, and "empty log" checks can be temporarily relaxed.
232233
scaleUp bool // The node is part of a scale up, puts us in observer mode until the log contains data.
233234
membChanging bool // There is a membership change proposal in progress
235+
deleted bool // If the node was deleted.
234236
}
235237

236238
type proposedEntry struct {
@@ -1917,13 +1919,20 @@ func (n *raft) Delete() {
19171919
n.Lock()
19181920
defer n.Unlock()
19191921

1922+
n.deleted = true
19201923
if wal := n.wal; wal != nil {
19211924
wal.Delete(false)
19221925
}
19231926
os.RemoveAll(n.sd)
19241927
n.debug("Deleted")
19251928
}
19261929

1930+
func (n *raft) IsDeleted() bool {
1931+
n.RLock()
1932+
defer n.RUnlock()
1933+
return n.deleted
1934+
}
1935+
19271936
func (n *raft) shutdown() {
19281937
// First call to Stop or Delete should close the quit chan
19291938
// to notify the runAs goroutines to stop what they're doing.

0 commit comments

Comments
 (0)