Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
307 changes: 238 additions & 69 deletions server/consumer.go

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12475,7 +12475,7 @@ func (o *consumerFileStore) HasState() bool {
}

// UpdateDelivered is called whenever a new message has been delivered.
func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) error {
func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64, subj string) error {
o.mu.Lock()
defer o.mu.Unlock()

Expand Down Expand Up @@ -12503,7 +12503,7 @@ func (o *consumerFileStore) UpdateDelivered(dseq, sseq, dc uint64, ts int64) err
}
} else {
// Add to pending.
o.state.Pending[sseq] = &Pending{dseq, ts}
o.state.Pending[sseq] = &Pending{dseq, ts, subj}
Comment on lines 12505 to +12506
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve original pending sequence on file-store redelivery

This path now always rewrites state.Pending[sseq] even when sseq was already delivered, replacing the original consumer delivery sequence with the redelivery sequence. UpdateAcks() depends on p.Sequence staying original to advance ack floors correctly, so file-backed consumers can end up with incorrect ack-floor progression and persisted pending state after redeliveries.

Useful? React with 👍 / 👎.

}
// Update delivered as needed.
if dseq > o.state.Delivered.Consumer {
Expand Down Expand Up @@ -12665,7 +12665,7 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
if len(state.Pending) > 0 {
pending = make(map[uint64]*Pending, len(state.Pending))
for seq, p := range state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
pending[seq] = &Pending{p.Sequence, p.Timestamp, p.Subject}
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
}
Expand Down Expand Up @@ -12714,7 +12714,7 @@ func (o *consumerFileStore) ForceUpdate(state *ConsumerState) error {
if len(state.Pending) > 0 {
pending = make(map[uint64]*Pending, len(state.Pending))
for seq, p := range state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
pending[seq] = &Pending{p.Sequence, p.Timestamp, p.Subject}
if seq <= state.AckFloor.Stream || seq > state.Delivered.Stream {
return fmt.Errorf("bad pending entry, sequence [%d] out of range", seq)
}
Expand Down Expand Up @@ -12896,7 +12896,7 @@ func checkConsumerHeader(hdr []byte) (uint8, error) {
func (o *consumerFileStore) copyPending() map[uint64]*Pending {
pending := make(map[uint64]*Pending, len(o.state.Pending))
for seq, p := range o.state.Pending {
pending[seq] = &Pending{p.Sequence, p.Timestamp}
pending[seq] = &Pending{p.Sequence, p.Timestamp, p.Subject}
}
return pending
}
Expand Down Expand Up @@ -12993,7 +12993,7 @@ func (o *consumerFileStore) stateWithCopyLocked(doCopy bool) (*ConsumerState, er
if doCopy {
o.state.Pending = make(map[uint64]*Pending, len(state.Pending))
for seq, p := range state.Pending {
o.state.Pending[seq] = &Pending{p.Sequence, p.Timestamp}
o.state.Pending[seq] = &Pending{p.Sequence, p.Timestamp, p.Subject}
}
} else {
o.state.Pending = state.Pending
Expand Down Expand Up @@ -13119,7 +13119,7 @@ func decodeConsumerState(buf []byte) (*ConsumerState, error) {
ts = (mints - ts) * int64(time.Second)
}
// Store in pending.
state.Pending[sseq] = &Pending{dseq, ts}
state.Pending[sseq] = &Pending{dseq, ts, _EMPTY_}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Persist pending subjects in recovered consumer state

Recovered pending entries are forced to Subject: _EMPTY_, so restart/snapshot restore drops subject identity for all in-flight acks. Since per-subject throttling relies on pending subjects, recovered consumers cannot accurately reapply MaxAckPendingPerSubject for already-pending messages and may over-deliver until those entries drain.

Useful? React with 👍 / 👎.

}
}

Expand Down
38 changes: 18 additions & 20 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2063,11 +2063,11 @@ func TestFileStoreConsumer(t *testing.T) {

// We should sanity check pending here as well, so will check if a pending value is below
// ack floor or above delivered.
state.Pending = map[uint64]*Pending{70: {70, tn}}
state.Pending = map[uint64]*Pending{70: {70, tn, _EMPTY_}}
shouldFail()
state.Pending = map[uint64]*Pending{140: {140, tn}}
state.Pending = map[uint64]*Pending{140: {140, tn, _EMPTY_}}
shouldFail()
state.Pending = map[uint64]*Pending{72: {72, tn}} // exact on floor should fail
state.Pending = map[uint64]*Pending{72: {72, tn, _EMPTY_}} // exact on floor should fail
shouldFail()

// Put timestamps a second apart.
Expand All @@ -2076,7 +2076,7 @@ func TestFileStoreConsumer(t *testing.T) {
ago := time.Now().Add(-30 * time.Second).Truncate(time.Second)
nt := func() *Pending {
ago = ago.Add(time.Second)
return &Pending{0, ago.UnixNano()}
return &Pending{0, ago.UnixNano(), _EMPTY_}
}
// Should succeed.
state.Pending = map[uint64]*Pending{75: nt(), 80: nt(), 83: nt(), 90: nt(), 111: nt()}
Expand Down Expand Up @@ -2143,9 +2143,9 @@ func TestFileStoreConsumerEncodeDecodePendingBelowStreamAckFloor(t *testing.T) {

now := time.Now().Round(time.Second).Add(-10 * time.Second).UnixNano()
state.Pending = map[uint64]*Pending{
10782: {1190, now},
10810: {1191, now + int64(time.Second)},
10815: {1192, now + int64(2*time.Second)},
10782: {1190, now, _EMPTY_},
10810: {1191, now + int64(time.Second), _EMPTY_},
10815: {1192, now + int64(2*time.Second), _EMPTY_},
}
buf := encodeConsumerState(state)

Expand Down Expand Up @@ -2562,16 +2562,14 @@ func TestFileStoreConsumerRedeliveredLost(t *testing.T) {
}

ts := time.Now().UnixNano()
o.UpdateDelivered(1, 1, 1, ts)
o.UpdateDelivered(2, 1, 2, ts)
o.UpdateDelivered(3, 1, 3, ts)
o.UpdateDelivered(4, 1, 4, ts)
o.UpdateDelivered(5, 2, 1, ts)
o.UpdateDelivered(1, 1, 1, ts, _EMPTY_)
o.UpdateDelivered(2, 1, 2, ts, _EMPTY_)
o.UpdateDelivered(3, 1, 3, ts, _EMPTY_)
o.UpdateDelivered(4, 1, 4, ts, _EMPTY_)
o.UpdateDelivered(5, 2, 1, ts, _EMPTY_)

restartConsumer()

o.UpdateDelivered(6, 2, 2, ts)
o.UpdateDelivered(7, 3, 1, ts)
o.UpdateDelivered(6, 2, 2, ts, _EMPTY_)
o.UpdateDelivered(7, 3, 1, ts, _EMPTY_)

restartConsumer()
if state, _ := o.State(); len(state.Pending) != 3 {
Expand Down Expand Up @@ -2641,7 +2639,7 @@ func TestFileStoreConsumerDeliveredUpdates(t *testing.T) {
testDelivered := func(dseq, sseq uint64) {
t.Helper()
ts := time.Now().UnixNano()
if err := o.UpdateDelivered(dseq, sseq, 1, ts); err != nil {
if err := o.UpdateDelivered(dseq, sseq, 1, ts, _EMPTY_); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
state, err := o.State()
Expand Down Expand Up @@ -2673,7 +2671,7 @@ func TestFileStoreConsumerDeliveredUpdates(t *testing.T) {
}
// Also if we do an update with a delivery count of anything but 1 here should also give same error.
ts := time.Now().UnixNano()
if err := o.UpdateDelivered(5, 130, 2, ts); err != ErrNoAckPolicy {
if err := o.UpdateDelivered(5, 130, 2, ts, _EMPTY_); err != ErrNoAckPolicy {
t.Fatalf("Expected a no ack policy error on update delivered with dc > 1, got %v", err)
}
})
Expand All @@ -2698,7 +2696,7 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) {
testDelivered := func(dseq, sseq uint64) {
t.Helper()
ts := time.Now().Round(time.Second).UnixNano()
if err := o.UpdateDelivered(dseq, sseq, 1, ts); err != nil {
if err := o.UpdateDelivered(dseq, sseq, 1, ts, _EMPTY_); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
pending++
Expand Down Expand Up @@ -2913,7 +2911,7 @@ func TestFileStoreConsumerPerf(t *testing.T) {
ts := start.UnixNano()

for i := uint64(1); i <= toStore; i++ {
if err := o.UpdateDelivered(i, i, 1, ts); err != nil {
if err := o.UpdateDelivered(i, i, 1, ts, _EMPTY_); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
Expand Down
Loading