Skip to content

Commit 0286d45

Browse files
[FIXED] Nil stream assignment during processClusterUpdateStream scale-up (nats-io#8049)
When `processClusterUpdateStream` takes the scale-up branch it calls `mset.sa.Sync` in `startClusterSubs`. If the stream has just been recovered and meta catchup hasn't yet assigned the stream (`mset.sa == nil`), this panics with a nil pointer deref. Resolves nats-io#7229 Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
2 parents d0df0ae + 3dbf9c0 commit 0286d45

File tree

2 files changed

+69
-0
lines changed

2 files changed

+69
-0
lines changed

server/jetstream_cluster.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4933,6 +4933,9 @@ func (js *jetStream) processClusterUpdateStream(acc *Account, osa, sa *streamAss
49334933

49344934
if !alreadyRunning && numReplicas > 1 {
49354935
if needsNode {
4936+
// Must run before startClusterSubs reads mset.sa.Sync.
4937+
mset.setStreamAssignment(sa)
4938+
49364939
// Since we are scaling up we want to make sure our sync subject
49374940
// is registered before we start our raft node.
49384941
mset.mu.Lock()

server/jetstream_cluster_1_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6785,6 +6785,72 @@ func TestJetStreamClusterMetaRecoveryAddAndUpdateStream(t *testing.T) {
67856785
require_Len(t, len(sa.Config.Subjects), 1)
67866786
}
67876787

6788+
// https://github.com/nats-io/nats-server/issues/7229
6789+
func TestJetStreamClusterProcessClusterUpdateStreamNilStreamAssignment(t *testing.T) {
6790+
c := createJetStreamClusterExplicit(t, "R3S", 3)
6791+
defer c.shutdown()
6792+
6793+
nc, js := jsClientConnect(t, c.randomServer())
6794+
defer nc.Close()
6795+
6796+
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}, Replicas: 3})
6797+
require_NoError(t, err)
6798+
6799+
s := c.randomServer()
6800+
acc, err := s.lookupAccount(globalAccountName)
6801+
require_NoError(t, err)
6802+
mset, err := acc.lookupStream("TEST")
6803+
require_NoError(t, err)
6804+
6805+
// Capture osa and build a sa that reflects the post-restart meta-replay
6806+
// conditions: a scale-up path where both the old and new raftGroup nodes
6807+
// are nil on this server (alreadyRunning=false, needsNode=true).
6808+
sjs := s.getJetStream()
6809+
sjs.mu.Lock()
6810+
realOsa := sjs.streamAssignment(globalAccountName, "TEST")
6811+
if realOsa == nil {
6812+
sjs.mu.Unlock()
6813+
t.Fatal("stream assignment not found")
6814+
}
6815+
6816+
// Intentionally omit Group.node so that in processClusterUpdateStream
6817+
// alreadyRunning (osa.Group.node != nil) is false and needsNode
6818+
// (sa.Group.node == nil) is true, taking the scale-up branch.
6819+
cloneGroup := func() *raftGroup {
6820+
return &raftGroup{
6821+
Name: realOsa.Group.Name,
6822+
Peers: append([]string{}, realOsa.Group.Peers...),
6823+
Storage: realOsa.Group.Storage,
6824+
}
6825+
}
6826+
osa := *realOsa
6827+
osa.Group = cloneGroup()
6828+
sa := osa
6829+
cfg := *realOsa.Config
6830+
cfg.MaxMsgs = 1_000
6831+
sa.Config = &cfg
6832+
sa.Group = cloneGroup()
6833+
sjs.mu.Unlock()
6834+
6835+
// Simulate the state right after recoverStream() before meta catchup:
6836+
// no stream assignment, no raft node, no cluster subs registered yet.
6837+
mset.mu.Lock()
6838+
mset.sa = nil
6839+
mset.node = nil
6840+
if mset.syncSub != nil {
6841+
mset.srv.sysUnsubscribe(mset.syncSub)
6842+
mset.syncSub = nil
6843+
}
6844+
mset.mu.Unlock()
6845+
6846+
sjs.processClusterUpdateStream(acc, &osa, &sa)
6847+
6848+
mset.mu.RLock()
6849+
got := mset.sa
6850+
mset.mu.RUnlock()
6851+
require_NotNil(t, got)
6852+
}
6853+
67886854
// Make sure if we received acks that are out of bounds, meaning past our
67896855
// last sequence or before our first that they are ignored and errored if applicable.
67906856
func TestJetStreamClusterConsumerAckOutOfBounds(t *testing.T) {

0 commit comments

Comments
 (0)