Skip to content

Commit b7baa0a

Browse files
MauriceVanVeenclaude
authored andcommitted
[IMPROVED] Defer consumer starting seq scan off meta goroutine
Signed-off-by: Maurice van Veen <github@mauricevanveen.com> Backport of nats-io#8051 (6b884f4) adapted for release/v2.12.7. The v2.12.7 signature of selectStartingSeqNo does not return an error, so the leader-elect path logs/stepdown handling was simplified accordingly.
1 parent 8e778d6 commit b7baa0a

File tree

3 files changed

+108
-15
lines changed

3 files changed

+108
-15
lines changed

server/consumer.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1228,13 +1228,15 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
12281228
}
12291229
}
12301230

1231+
standalone := !s.JetStreamIsClustered() && s.standAloneMode()
12311232
if o.store != nil && o.store.HasState() {
12321233
// Restore our saved state.
12331234
o.mu.Lock()
12341235
o.readStoredState()
12351236
o.mu.Unlock()
1236-
} else {
1237-
// Select starting sequence number
1237+
} else if config.Direct || standalone {
1238+
// Clustered non-direct consumers defer this to setLeader so the
1239+
// expensive store scans don't block the meta apply goroutine.
12381240
o.selectStartingSeqNo()
12391241
}
12401242

@@ -1304,7 +1306,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
13041306
mset.setConsumer(o)
13051307
mset.mu.Unlock()
13061308

1307-
if config.Direct || (!s.JetStreamIsClustered() && s.standAloneMode()) {
1309+
if config.Direct || standalone {
13081310
o.setLeader(true)
13091311
}
13101312

@@ -1472,18 +1474,29 @@ func (o *consumer) isLeader() bool {
14721474
return o.leader.Load()
14731475
}
14741476

1475-
func (o *consumer) setLeader(isLeader bool) {
1477+
func (o *consumer) setLeader(isLeader bool) error {
14761478
o.mu.RLock()
14771479
mset, closed := o.mset, o.closed
14781480
movingToClustered := o.node != nil && o.pch == nil
14791481
movingToNonClustered := o.node == nil && o.pch != nil
14801482
wasLeader := o.leader.Swap(isLeader)
1483+
1484+
// For clustered new consumers, starting seq selection was deferred from
1485+
// addConsumerWithAssignment so the scan wouldn't block the meta apply
1486+
// goroutine, run it here on leader-elect instead.
1487+
needsSelect := isLeader && !wasLeader && o.dseq == 0 && (o.store == nil || !o.store.HasState())
14811488
o.mu.RUnlock()
14821489

14831490
// If we are here we have a change in leader status.
14841491
if isLeader {
14851492
if closed || mset == nil {
1486-
return
1493+
return nil
1494+
}
1495+
1496+
if needsSelect {
1497+
o.mu.Lock()
1498+
o.selectStartingSeqNo()
1499+
o.mu.Unlock()
14871500
}
14881501

14891502
if wasLeader {
@@ -1512,7 +1525,7 @@ func (o *consumer) setLeader(isLeader bool) {
15121525
}
15131526
o.mu.Unlock()
15141527
}
1515-
return
1528+
return nil
15161529
}
15171530

15181531
mset.mu.RLock()
@@ -1551,15 +1564,15 @@ func (o *consumer) setLeader(isLeader bool) {
15511564
if o.cfg.AckPolicy != AckNone {
15521565
if o.ackSub, err = o.subscribeInternal(o.ackSubj, o.pushAck); err != nil {
15531566
o.mu.Unlock()
1554-
return
1567+
return nil
15551568
}
15561569
}
15571570

15581571
// Setup the internal sub for next message requests regardless.
15591572
// Will error if wrong mode to provide feedback to users.
15601573
if o.reqSub, err = o.subscribeInternal(o.nextMsgSubj, o.processNextMsgReq); err != nil {
15611574
o.mu.Unlock()
1562-
return
1575+
return nil
15631576
}
15641577

15651578
// Check on flow control settings.
@@ -1568,7 +1581,7 @@ func (o *consumer) setLeader(isLeader bool) {
15681581
fcsubj := fmt.Sprintf(jsFlowControl, stream, o.name)
15691582
if o.fcSub, err = o.subscribeInternal(fcsubj, o.processFlowControl); err != nil {
15701583
o.mu.Unlock()
1571-
return
1584+
return nil
15721585
}
15731586
}
15741587

@@ -1704,6 +1717,7 @@ func (o *consumer) setLeader(isLeader bool) {
17041717
}
17051718
o.mu.Unlock()
17061719
}
1720+
return nil
17071721
}
17081722

17091723
// This is coming on the wire so do not block here.
@@ -3167,15 +3181,23 @@ func (o *consumer) infoWithSnapAndReply(snap bool, reply string) *ConsumerInfo {
31673181
return nil
31683182
}
31693183

3184+
dseq, sseq := o.dseq, o.sseq
3185+
if dseq <= 0 {
3186+
dseq = 1
3187+
}
3188+
if sseq <= 0 {
3189+
sseq = 1
3190+
}
3191+
31703192
cfg := o.cfg
31713193
info := &ConsumerInfo{
31723194
Stream: o.stream,
31733195
Name: o.name,
31743196
Created: o.created,
31753197
Config: &cfg,
31763198
Delivered: SequenceInfo{
3177-
Consumer: o.dseq - 1,
3178-
Stream: o.sseq - 1,
3199+
Consumer: dseq - 1,
3200+
Stream: sseq - 1,
31793201
},
31803202
AckFloor: SequenceInfo{
31813203
Consumer: o.adflr,

server/jetstream_cluster.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5806,10 +5806,15 @@ func (js *jetStream) processClusterCreateConsumer(oca, ca *consumerAssignment, s
58065806
func() {
58075807
defer s.grWG.Done()
58085808
defer o.clearMonitorRunning()
5809-
o.setLeader(true)
5809+
err := o.setLeader(true)
58105810
var resp = JSApiConsumerCreateResponse{ApiResponse: ApiResponse{Type: JSApiConsumerCreateResponseType}}
5811-
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
5812-
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
5811+
if err != nil {
5812+
resp.Error = NewJSConsumerCreateError(err, Unless(err))
5813+
s.sendAPIErrResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
5814+
} else {
5815+
resp.ConsumerInfo = setDynamicConsumerInfoMetadata(o.info())
5816+
s.sendAPIResponse(client, acc, subject, reply, _EMPTY_, s.jsonResponse(&resp))
5817+
}
58135818
},
58145819
pprofLabels{
58155820
"type": "consumer",
@@ -6506,6 +6511,9 @@ func (js *jetStream) applyConsumerEntries(o *consumer, ce *CommittedEntry, isLea
65066511
if !o.isLeader() && sseq > o.sseq {
65076512
o.sseq = sseq
65086513
}
6514+
if o.dseq == 0 {
6515+
o.dseq = 1
6516+
}
65096517
if o.store != nil {
65106518
o.store.UpdateStarting(sseq - 1)
65116519
}
@@ -6677,7 +6685,9 @@ func (js *jetStream) processConsumerLeaderChangeWithAssignment(o *consumer, ca *
66776685
}
66786686

66796687
// Tell consumer to switch leader status.
6680-
o.setLeader(isLeader)
6688+
if lerr := o.setLeader(isLeader); lerr != nil && err == nil {
6689+
err = lerr
6690+
}
66816691

66826692
if !isLeader || hasResponded {
66836693
if isLeader {

server/jetstream_cluster_3_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8233,3 +8233,64 @@ func TestJetStreamClusterStreamLeaderStepsDownIfSnapshotCatchupRequired(t *testi
82338233
return checkState(t, c, globalAccountName, "TEST")
82348234
})
82358235
}
8236+
8237+
func TestJetStreamClusterConsumerSelectStartingSeqDeferred(t *testing.T) {
8238+
c := createJetStreamClusterExplicit(t, "R3S", 3)
8239+
defer c.shutdown()
8240+
8241+
nc, js := jsClientConnect(t, c.randomServer())
8242+
defer nc.Close()
8243+
8244+
_, err := js.AddStream(&nats.StreamConfig{
8245+
Name: "TEST",
8246+
Subjects: []string{"foo"},
8247+
Replicas: 3,
8248+
})
8249+
require_NoError(t, err)
8250+
8251+
_, err = js.Publish("foo", nil)
8252+
require_NoError(t, err)
8253+
8254+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
8255+
Durable: "C",
8256+
AckPolicy: nats.AckExplicitPolicy,
8257+
Replicas: 3,
8258+
})
8259+
require_NoError(t, err)
8260+
8261+
leader := c.consumerLeader(globalAccountName, "TEST", "C")
8262+
require_NotNil(t, leader)
8263+
follower := c.randomNonConsumerLeader(globalAccountName, "TEST", "C")
8264+
require_NotNil(t, follower)
8265+
8266+
getConsumer := func(s *Server) *consumer {
8267+
t.Helper()
8268+
mset, err := s.globalAccount().lookupStream("TEST")
8269+
require_NoError(t, err)
8270+
o := mset.lookupConsumer("C")
8271+
require_NotNil(t, o)
8272+
return o
8273+
}
8274+
8275+
// On the leader, selectStartingSeqNo ran inside setLeader(true).
8276+
l := getConsumer(leader)
8277+
l.mu.RLock()
8278+
ldseq, lsseq := l.dseq, l.sseq
8279+
l.mu.RUnlock()
8280+
require_Equal(t, ldseq, 1)
8281+
require_Equal(t, lsseq, 1)
8282+
8283+
// On the follower, meta apply must not have run selectStartingSeqNo.
8284+
checkFor(t, 2*time.Second, 200*time.Millisecond, func() error {
8285+
f := getConsumer(follower)
8286+
f.mu.RLock()
8287+
defer f.mu.RUnlock()
8288+
if f.dseq != 1 {
8289+
return fmt.Errorf("expected follower dseq 1, got %d", f.dseq)
8290+
}
8291+
if f.sseq != 1 {
8292+
return fmt.Errorf("expected follower sseq 1, got %d", f.sseq)
8293+
}
8294+
return nil
8295+
})
8296+
}

0 commit comments

Comments
 (0)