Skip to content

Commit 16335ad

Browse files
committed
reset consumer state to stream state
1 parent 4a7566f commit 16335ad

File tree

5 files changed

+161
-1
lines changed

5 files changed

+161
-1
lines changed

server/consumer.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
963963
}
964964

965965
mset.mu.RLock()
966-
s, js, jsa, cfg, acc := mset.srv, mset.js, mset.jsa, mset.cfg, mset.acc
966+
s, js, jsa, cfg, acc, lseq := mset.srv, mset.js, mset.jsa, mset.cfg, mset.acc, mset.lseq
967967
mset.mu.RUnlock()
968968

969969
// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
@@ -1228,6 +1228,31 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
12281228
o.mu.Lock()
12291229
o.readStoredState()
12301230
o.mu.Unlock()
1231+
1232+
// only for streams with r1; if we are recovering and the consumer sequence is greater than the last sequence,
1233+
// we need to select a new starting sequence
1234+
// run only for single replica streams
1235+
if mset.cfg.Replicas == 1 && o.sseq > 2 && lseq < o.sseq && isRecovering {
1236+
o.srv.Warnf("Consumer %q recovering with sseq %d greater than stream last seq %d, selecting new start", o.name, o.sseq, lseq)
1237+
1238+
// set starting sequences to in-memory state
1239+
state := &ConsumerState{}
1240+
o.mu.Lock()
1241+
o.selectStartingSeqNo()
1242+
1243+
state.Delivered.Stream = o.sseq
1244+
state.Delivered.Consumer = o.dseq
1245+
state.AckFloor.Stream = o.asflr
1246+
state.AckFloor.Consumer = o.adflr
1247+
o.mu.Unlock()
1248+
err := o.store.ForceUpdate(state)
1249+
if err != nil {
1250+
s.Warnf("Consumer %q error updating state: %v", o.name, err)
1251+
return nil, NewJSConsumerStoreFailedError(err)
1252+
}
1253+
1254+
o.srv.Warnf("Consumer %q adjusted starting sequence to %d", o.name, o.sseq)
1255+
}
12311256
} else {
12321257
// Select starting sequence number
12331258
o.selectStartingSeqNo()

server/filestore.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11593,6 +11593,31 @@ func (o *consumerFileStore) Update(state *ConsumerState) error {
1159311593
return nil
1159411594
}
1159511595

11596+
// ForceUpdate updates the consumer state without the backwards check.
11597+
// This is used during recovery when we need to reset the consumer to an earlier sequence.
11598+
func (o *consumerFileStore) ForceUpdate(state *ConsumerState) error {
11599+
o.fs.warn("Consumer %q force updating state: %+v", o.name, state)
11600+
// Sanity checks.
11601+
if state.AckFloor.Consumer > state.Delivered.Consumer {
11602+
return fmt.Errorf("bad ack floor for consumer")
11603+
}
11604+
if state.AckFloor.Stream > state.Delivered.Stream {
11605+
return fmt.Errorf("bad ack floor for stream")
11606+
}
11607+
11608+
o.mu.Lock()
11609+
defer o.mu.Unlock()
11610+
11611+
o.state.Delivered = state.Delivered
11612+
o.state.AckFloor = state.AckFloor
11613+
o.state.Pending = make(map[uint64]*Pending)
11614+
o.state.Redelivered = make(map[uint64]uint64)
11615+
11616+
o.kickFlusher()
11617+
11618+
return nil
11619+
}
11620+
1159611621
// Will encrypt the state with our asset key. Will be a no-op if encryption not enabled.
1159711622
// Lock should be held.
1159811623
func (o *consumerFileStore) encryptState(buf []byte) ([]byte, error) {

server/filestore_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8518,6 +8518,94 @@ func TestFileStoreRecoverFullStateDetectCorruptState(t *testing.T) {
85188518
require_Error(t, err, errCorruptState)
85198519
}
85208520

8521+
func TestFileStoreResetConsumerToStreamState(t *testing.T) {
8522+
fs, err := newFileStore(
8523+
FileStoreConfig{StoreDir: t.TempDir()},
8524+
StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage})
8525+
require_NoError(t, err)
8526+
defer fs.Stop()
8527+
8528+
msg := []byte("abc")
8529+
for i := 1; i <= 30; i++ {
8530+
_, _, err = fs.StoreMsg(fmt.Sprintf("foo.%d", i), nil, msg, 0)
8531+
require_NoError(t, err)
8532+
}
8533+
8534+
err = fs.writeFullState()
8535+
require_NoError(t, err)
8536+
8537+
obs, err := fs.ConsumerStore("c1", time.Now(), &ConsumerConfig{
8538+
Durable: "c1",
8539+
FilterSubject: "foo.*",
8540+
AckPolicy: AckNone,
8541+
DeliverPolicy: DeliverAll,
8542+
})
8543+
8544+
require_NoError(t, err)
8545+
defer obs.Stop()
8546+
8547+
state := &ConsumerState{}
8548+
state.Delivered = SequencePair{
8549+
Consumer: 5,
8550+
Stream: 5,
8551+
}
8552+
8553+
state.AckFloor = SequencePair{
8554+
Consumer: 5,
8555+
Stream: 5,
8556+
}
8557+
8558+
// set to 5
8559+
err = obs.Update(state)
8560+
require_NoError(t, err)
8561+
8562+
currState, err := obs.State()
8563+
require_NoError(t, err)
8564+
8565+
require_Equal(t, fs.state.LastSeq, uint64(30))
8566+
require_Equal(t, fs.state.FirstSeq, uint64(1))
8567+
require_Equal(t, currState.AckFloor, state.AckFloor)
8568+
require_Equal(t, currState.Delivered, state.Delivered)
8569+
require_Equal(t, len(currState.Redelivered), len(state.Redelivered))
8570+
require_Equal(t, len(currState.Pending), len(state.Pending))
8571+
8572+
// reset our state to 0.
8573+
fs.state.FirstSeq = 0
8574+
fs.state.LastSeq = 0
8575+
8576+
// set back to lower values
8577+
newState := &ConsumerState{}
8578+
newState.Delivered = SequencePair{
8579+
Consumer: 1,
8580+
Stream: 4,
8581+
}
8582+
8583+
newState.AckFloor = SequencePair{
8584+
Consumer: 1,
8585+
Stream: 3,
8586+
}
8587+
8588+
// update should fail but force update should pass
8589+
err = obs.Update(newState)
8590+
require_Error(t, err, fmt.Errorf("old update ignored"))
8591+
8592+
err = obs.ForceUpdate(newState)
8593+
require_NoError(t, err)
8594+
8595+
fullState, err := obs.State()
8596+
require_NoError(t, err)
8597+
8598+
borrowedState, err := obs.BorrowState()
8599+
require_NoError(t, err)
8600+
8601+
require_Equal(t, fullState.Delivered, borrowedState.Delivered)
8602+
require_Equal(t, fullState.AckFloor, borrowedState.AckFloor)
8603+
require_Equal(t, newState.Delivered, borrowedState.Delivered)
8604+
require_Equal(t, newState.AckFloor, borrowedState.AckFloor)
8605+
require_Equal(t, len(newState.Redelivered), len(borrowedState.Redelivered))
8606+
require_Equal(t, len(newState.Pending), len(borrowedState.Pending))
8607+
}
8608+
85218609
func TestFileStoreNumPendingMulti(t *testing.T) {
85228610
fs, err := newFileStore(
85238611
FileStoreConfig{StoreDir: t.TempDir()},

server/memstore.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2311,6 +2311,27 @@ func (o *consumerMemStore) Update(state *ConsumerState) error {
23112311
return nil
23122312
}
23132313

2314+
func (o *consumerMemStore) ForceUpdate(state *ConsumerState) error {
2315+
// Sanity checks.
2316+
if state.AckFloor.Consumer > state.Delivered.Consumer {
2317+
return fmt.Errorf("bad ack floor for consumer")
2318+
}
2319+
if state.AckFloor.Stream > state.Delivered.Stream {
2320+
return fmt.Errorf("bad ack floor for stream")
2321+
}
2322+
2323+
// Replace our state.
2324+
o.mu.Lock()
2325+
defer o.mu.Unlock()
2326+
2327+
o.state.Delivered = state.Delivered
2328+
o.state.AckFloor = state.AckFloor
2329+
o.state.Pending = make(map[uint64]*Pending)
2330+
o.state.Redelivered = make(map[uint64]uint64)
2331+
2332+
return nil
2333+
}
2334+
23142335
// SetStarting sets our starting stream sequence.
23152336
func (o *consumerMemStore) SetStarting(sseq uint64) error {
23162337
o.mu.Lock()

server/store.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,7 @@ type ConsumerStore interface {
363363
UpdateAcks(dseq, sseq uint64) error
364364
UpdateConfig(cfg *ConsumerConfig) error
365365
Update(*ConsumerState) error
366+
ForceUpdate(*ConsumerState) error
366367
State() (*ConsumerState, error)
367368
BorrowState() (*ConsumerState, error)
368369
EncodedState() ([]byte, error)

0 commit comments

Comments
 (0)