Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 171 additions & 67 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,25 +86,26 @@ type PriorityGroupState struct {
}

type ConsumerConfig struct {
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
FilterSubjects []string `json:"filter_subjects,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`
Durable string `json:"durable_name,omitempty"`
Name string `json:"name,omitempty"`
Description string `json:"description,omitempty"`
DeliverPolicy DeliverPolicy `json:"deliver_policy"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
AckPolicy AckPolicy `json:"ack_policy"`
AckWait time.Duration `json:"ack_wait,omitempty"`
MaxDeliver int `json:"max_deliver,omitempty"`
BackOff []time.Duration `json:"backoff,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
FilterSubjects []string `json:"filter_subjects,omitempty"`
ReplayPolicy ReplayPolicy `json:"replay_policy"`
RateLimit uint64 `json:"rate_limit_bps,omitempty"` // Bits per sec
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
MaxAckPendingPerSubject int `json:"max_ack_pending_per_subject,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
HeadersOnly bool `json:"headers_only,omitempty"`

// Pull based options.
MaxRequestBatch int `json:"max_batch,omitempty"`
Expand Down Expand Up @@ -444,6 +445,7 @@ type consumer struct {
nextMsgReqs *ipQueue[*nextMsgReq]
resetSubj string
maxp int
ptc map[string]int
pblimit int
maxpb int
pbytes int
Expand Down Expand Up @@ -2229,8 +2231,8 @@ func (o *consumer) hasMaxDeliveries(seq uint64) bool {
}
// Make sure to remove from pending.
if p, ok := o.pending[seq]; ok && p != nil {
delete(o.pending, seq)
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp)
o.removeFromPending(seq)
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp, p.Subject)
Comment on lines 2233 to +2237
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Signal after max-delivery removal frees subject capacity

hasMaxDeliveries() now removes pending entries through removeFromPending(), but it discards the boolean that indicates a subject limit transition and only signals on global MaxAckPending. If an expired message hits MaxDeliver and was the only pending message for a subject at MaxAckPendingPerSubject, the consumer is not woken up to retry blocked messages on that subject, so push consumers can remain stalled until unrelated activity triggers another wakeup.

Useful? React with 👍 / 👎.

}
// Ensure redelivered state is set, if not already.
if o.rdc == nil {
Expand Down Expand Up @@ -2652,7 +2654,7 @@ func (o *consumer) progressUpdate(seq uint64) {
if p, ok := o.pending[seq]; ok {
p.Timestamp = time.Now().UnixNano()
// Update store system.
o.updateDelivered(p.Sequence, seq, 1, p.Timestamp)
o.updateDelivered(p.Sequence, seq, 1, p.Timestamp, p.Subject)
}
}

Expand Down Expand Up @@ -2819,7 +2821,7 @@ func (o *consumer) propose(entry []byte) {
}

// Lock should be held.
func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) {
func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64, subj string) {
// Clustered mode and R>1.
if o.node != nil {
// Inline for now, use variable compression.
Expand All @@ -2830,9 +2832,14 @@ func (o *consumer) updateDelivered(dseq, sseq, dc uint64, ts int64) {
n += binary.PutUvarint(b[n:], sseq)
n += binary.PutUvarint(b[n:], dc)
n += binary.PutVarint(b[n:], ts)
o.propose(b[:n])
if len(subj) > 0 {
buf := append(b[:n], subj...)
o.propose(buf)
} else {
o.propose(b[:n])
}
} else if o.store != nil {
o.store.UpdateDelivered(dseq, sseq, dc, ts)
o.store.UpdateDelivered(dseq, sseq, dc, ts, subj)
}
// Update activity.
o.ldt = time.Now()
Expand Down Expand Up @@ -3046,7 +3053,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
// now - ackWait is expired now, so offset from there.
p.Timestamp = time.Now().Add(-o.cfg.AckWait).Add(d).UnixNano()
// Update store system which will update followers as well.
o.updateDelivered(p.Sequence, sseq, dc, p.Timestamp)
o.updateDelivered(p.Sequence, sseq, dc, p.Timestamp, p.Subject)
if o.ptmr != nil {
// Want checkPending to run and figure out the next timer ttl.
// TODO(dlc) - We could optimize this maybe a bit more and track when we expect the timer to fire.
Expand Down Expand Up @@ -3478,9 +3485,11 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
if o.maxp > 0 && len(o.pending) >= o.maxp {
needSignal = true
}
delete(o.pending, sseq)
// Use the original deliver sequence from our pending record.
dseq = p.Sequence
if o.removeFromPending(sseq) {
needSignal = true
}

// Only move floors if we matched an existing pending.
if len(o.pending) == 0 {
Expand Down Expand Up @@ -3515,7 +3524,9 @@ func (o *consumer) processAckMsg(sseq, dseq, dc uint64, reply string, doSample b
o.adflr, o.asflr = dseq, sseq

remove := func(seq uint64) {
delete(o.pending, seq)
if o.removeFromPending(seq) {
needSignal = true
}
delete(o.rdc, seq)
o.removeFromRedeliverQueue(seq)
if seq < floor {
Expand Down Expand Up @@ -4602,8 +4613,8 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
}
// Make sure to remove from pending.
if p, ok := o.pending[seq]; ok && p != nil {
delete(o.pending, seq)
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp)
o.removeFromPending(seq)
o.updateDelivered(p.Sequence, seq, dc, p.Timestamp, p.Subject)
}
continue
}
Expand All @@ -4615,6 +4626,16 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
// Adjust back deliver count.
o.decDeliveryCount(seq)
}
if sm != nil {
if sm != &pmsg.StoreMsg {
sm.copy(&pmsg.StoreMsg)
}
if _, alreadyPending := o.pending[seq]; !alreadyPending && o.pendingSubjectHitLimit(pmsg.subj) {
pmsg.returnToPool()
o.decDeliveryCount(seq)
continue
}
}
// Message was scheduled for redelivery but was removed in the meantime.
if err == ErrStoreMsgNotFound || err == errDeletedMsg {
// This is a race condition where the message is still in o.pending and
Expand All @@ -4636,7 +4657,7 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
return nil, 0, errMaxAckPending
}

if o.hasSkipListPending() {
for o.hasSkipListPending() {
seq := o.lss.seqs[0]
if len(o.lss.seqs) == 1 {
o.sseq = o.lss.resume
Expand All @@ -4652,41 +4673,80 @@ func (o *consumer) getNextMsg() (*jsPubMsg, uint64, error) {
pmsg.returnToPool()
pmsg = nil
}
if sm != nil {
if sm != &pmsg.StoreMsg {
sm.copy(&pmsg.StoreMsg)
}
if o.pendingSubjectHitLimit(pmsg.subj) {
pmsg.returnToPool()
o.sseq++
continue
}
}
o.sseq++
return pmsg, 1, err
}

var sseq uint64
var err error
var sm *StoreMsg
var pmsg = getJSPubMsgFromPool()
// Save our starting sequence. If all remaining messages are on blocked
// subjects, we restore sseq so we don't permanently skip them.
savedSseq := o.sseq

// Grab next message applicable to us.
filters, subjf, fseq := o.filters, o.subjf, o.sseq
// Check if we are multi-filtered or not.
if filters != nil {
sm, sseq, err = o.mset.store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg)
} else if len(subjf) > 0 { // Means single filtered subject since o.filters means > 1.
filter, wc := subjf[0].subject, subjf[0].hasWildcard
sm, sseq, err = o.mset.store.LoadNextMsg(filter, wc, fseq, &pmsg.StoreMsg)
} else {
// No filter here.
sm, sseq, err = o.mset.store.LoadNextMsg(_EMPTY_, false, fseq, &pmsg.StoreMsg)
}
if sm == nil {
pmsg.returnToPool()
pmsg = nil
}
// Check if we should move our o.sseq.
if sseq >= o.sseq {
// If we are moving step by step then sseq == o.sseq.
// If we have jumped we should update skipped for other replicas.
if sseq != o.sseq && err == ErrStoreEOF {
o.updateSkipped(sseq + 1)
for {
var sseq uint64
var err error
var sm *StoreMsg
var pmsg = getJSPubMsgFromPool()

// Grab next message applicable to us.
filters, subjf, fseq := o.filters, o.subjf, o.sseq
// Check if we are multi-filtered or not.
if filters != nil {
sm, sseq, err = o.mset.store.LoadNextMsgMulti(filters, fseq, &pmsg.StoreMsg)
} else if len(subjf) > 0 { // Means single filtered subject since o.filters means > 1.
filter, wc := subjf[0].subject, subjf[0].hasWildcard
sm, sseq, err = o.mset.store.LoadNextMsg(filter, wc, fseq, &pmsg.StoreMsg)
} else {
// No filter here.
sm, sseq, err = o.mset.store.LoadNextMsg(_EMPTY_, false, fseq, &pmsg.StoreMsg)
}
if sm == nil {
pmsg.returnToPool()
// No more messages available. If we skipped any blocked subjects,
// restore o.sseq so those messages aren't permanently lost.
if o.sseq != savedSseq {
o.sseq = savedSseq
}
return nil, 0, err
}
o.sseq = sseq + 1

if sm != &pmsg.StoreMsg {
sm.copy(&pmsg.StoreMsg)
}
if o.pendingSubjectHitLimit(pmsg.subj) {
// Skip this message and try the next one so we don't
// head-of-line block other subjects.
pmsg.returnToPool()
if sseq >= o.sseq {
o.sseq = sseq + 1
}
continue
}

// Found an unblocked message. Restore o.sseq to the saved position
// so blocked messages earlier in the stream aren't permanently skipped.
// Only the delivered message's sequence will be tracked via pending.
o.sseq = savedSseq
// Check if we should move our o.sseq.
if sseq >= o.sseq {
// If we are moving step by step then sseq == o.sseq.
// If we have jumped we should update skipped for other replicas.
if sseq != o.sseq && err == ErrStoreEOF {
o.updateSkipped(sseq + 1)
}
o.sseq = sseq + 1
}
Comment on lines +4775 to +4778
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve blocked sequences when advancing delivery cursor

When getNextMsg() skips a blocked subject and later finds an unblocked message, this branch still advances o.sseq to sseq+1, which moves the cursor past earlier blocked sequences that were never delivered. With a stream like A1, A2, B1 and MaxAckPendingPerSubject=1, delivering B1 sets o.sseq past A2; after A1 is acked, A2 is no longer revisited and can be permanently skipped.

Useful? React with 👍 / 👎.

return pmsg, 1, err
}
return pmsg, 1, err
}

// Will check for expiration and lack of interest on waiting requests.
Expand Down Expand Up @@ -5433,10 +5493,10 @@ func (o *consumer) deliverMsg(dsubj, ackReply string, pmsg *jsPubMsg, dc uint64,
seq, ts := pmsg.seq, pmsg.ts

// Update delivered first.
o.updateDelivered(dseq, seq, dc, ts)
o.updateDelivered(dseq, seq, dc, ts, pmsg.subj)

if ap == AckExplicit || ap == AckAll {
o.trackPending(seq, dseq)
o.trackPending(seq, dseq, pmsg.subj)
} else if ap == AckNone {
o.adflr = dseq
o.asflr = seq
Expand Down Expand Up @@ -5551,9 +5611,52 @@ func (o *consumer) sendFlowControl() {
o.outq.send(newJSPubMsg(subj, _EMPTY_, rply, hdr, nil, nil, 0))
}

func (o *consumer) pendingSubjectHitLimit(subj string) bool {
if o.cfg.MaxAckPendingPerSubject <= 0 || subj == _EMPTY_ {
return false
}
return o.ptc[subj] >= o.cfg.MaxAckPendingPerSubject
}

func (o *consumer) incPendingSubject(subj string) {
if o.cfg.MaxAckPendingPerSubject <= 0 || subj == _EMPTY_ {
return
Comment on lines +5653 to +5654
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Rebuild subject pending counts after enabling per-subject caps

incPendingSubject skips tracking whenever MaxAckPendingPerSubject is unset, but existing pending entries are still kept in o.pending. If a live consumer is later updated from 0/unset to a positive max_ack_pending_per_subject, those already-pending messages are never backfilled into o.ptc, so pendingSubjectHitLimit enforces the cap only for new deliveries and can over-deliver on subjects that were already in flight before the update.

Useful? React with 👍 / 👎.

}
if o.ptc == nil {
o.ptc = make(map[string]int)
}
o.ptc[subj]++
}

func (o *consumer) decPendingSubject(subj string) bool {
if o.cfg.MaxAckPendingPerSubject <= 0 || subj == _EMPTY_ || o.ptc == nil {
return false
Comment on lines +5663 to +5664
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Decrement subject counters when toggling per-subject limits

decPendingSubject exits early whenever MaxAckPendingPerSubject <= 0, so if this limit is disabled on a live consumer, pending entries are removed/acked, and then the limit is enabled again, o.ptc can retain stale counts from before disablement. pendingSubjectHitLimit will then treat those subjects as still saturated and block fresh deliveries even though no real pending entries remain.

Useful? React with 👍 / 👎.

}
var needSignal bool
if o.ptc[subj] >= o.cfg.MaxAckPendingPerSubject {
needSignal = true
}
if o.ptc[subj] > 0 {
o.ptc[subj]--
if o.ptc[subj] == 0 {
delete(o.ptc, subj)
}
}
return needSignal
}

func (o *consumer) removeFromPending(seq uint64) bool {
var needSignal bool
if p, ok := o.pending[seq]; ok && p != nil {
needSignal = o.decPendingSubject(p.Subject)
delete(o.pending, seq)
}
return needSignal
}

// Tracks our outstanding pending acks. Only applicable to AckExplicit mode.
// Lock should be held.
func (o *consumer) trackPending(sseq, dseq uint64) {
func (o *consumer) trackPending(sseq, dseq uint64, subj string) {
if o.pending == nil {
o.pending = make(map[uint64]*Pending)
}
Expand All @@ -5564,7 +5667,8 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
// So do not update p.Sequence.
p.Timestamp = now.UnixNano()
} else {
o.pending[sseq] = &Pending{dseq, now.UnixNano()}
o.pending[sseq] = &Pending{dseq, now.UnixNano(), subj}
o.incPendingSubject(subj)
}

// We could have a backoff that set a timer higher than what we need for this message.
Expand Down Expand Up @@ -5733,7 +5837,7 @@ func (o *consumer) checkPending() {
}
// Check if these are no longer valid.
if seq < fseq || seq <= o.asflr {
delete(o.pending, seq)
o.removeFromPending(seq)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Signal after checkPending removes subject-limited pending entries

This path removes invalid pending entries with removeFromPending(seq) but drops the returned transition flag, so when that removal is what frees a subject from MaxAckPendingPerSubject, no wakeup is sent to resume blocked deliveries. For push consumers already waiting after a subject-limit stall, this can leave delivery paused until unrelated traffic triggers another signal.

Useful? React with 👍 / 👎.

delete(o.rdc, seq)
o.removeFromRedeliverQueue(seq)
shouldUpdateState = true
Expand Down Expand Up @@ -5929,7 +6033,7 @@ func (o *consumer) reconcileStateWithStream(streamLastSeq uint64) {
if len(o.pending) > 0 {
for seq := range o.pending {
if seq > streamLastSeq {
delete(o.pending, seq)
o.removeFromPending(seq)
}
}
}
Expand Down Expand Up @@ -6192,7 +6296,7 @@ func (o *consumer) purge(sseq uint64, slseq uint64, isWider bool) {
o.dseq = o.adflr
}
}
delete(o.pending, seq)
o.removeFromPending(seq)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Signal after purge removes a subject-limited pending entry

removeFromPending() returns whether a subject just transitioned from blocked to available, but this purge path ignores that signal. When MaxAckPendingPerSubject is hit and a pending message is removed by stream purge (or wider purge reconciliation), the consumer may not be woken to retry delivery for that subject, so push/pull delivery can stay stalled until unrelated traffic triggers another wakeup.

Useful? React with 👍 / 👎.

delete(o.rdc, seq)
// rdq handled below.
}
Expand All @@ -6207,7 +6311,7 @@ func (o *consumer) purge(sseq uint64, slseq uint64, isWider bool) {
o.dseq = o.adflr
}
}
delete(o.pending, seq)
o.removeFromPending(seq)
delete(o.rdc, seq)
}
}
Expand Down
Loading