Skip to content

Commit d34d983

Browse files
(2.14) [IMPROVED] Filestore error handling
Signed-off-by: Maurice van Veen <github@mauricevanveen.com>
1 parent 66e9bbc commit d34d983

17 files changed

+1538
-505
lines changed

server/avl/norace_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ func TestNoRaceSeqSetEncodeLarge(t *testing.T) {
9898
expected := time.Millisecond
9999

100100
start := time.Now()
101-
b, err := ss.Encode(nil)
102-
require_NoError(t, err)
101+
b := ss.Encode(nil)
103102

104103
if elapsed := time.Since(start); elapsed > expected {
105104
t.Fatalf("Expected encode of %d items with encoded size %v to take less than %v, got %v",

server/avl/seqset.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (ss SequenceSet) EncodeLen() int {
239239
return minLen + (ss.Nodes() * ((numBuckets+1)*8 + 2))
240240
}
241241

242-
func (ss SequenceSet) Encode(buf []byte) ([]byte, error) {
242+
func (ss SequenceSet) Encode(buf []byte) []byte {
243243
nn, encLen := ss.Nodes(), ss.EncodeLen()
244244

245245
if cap(buf) < encLen {
@@ -268,7 +268,7 @@ func (ss SequenceSet) Encode(buf []byte) ([]byte, error) {
268268
le.PutUint16(buf[i:], uint16(n.h))
269269
i += 2
270270
})
271-
return buf[:i], nil
271+
return buf[:i]
272272
}
273273

274274
// ErrBadEncoding is returned when we can not decode properly.

server/consumer.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1237,7 +1237,9 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
12371237
o.mu.Unlock()
12381238
} else {
12391239
// Select starting sequence number
1240-
o.selectStartingSeqNo()
1240+
if err := o.selectStartingSeqNo(); err != nil {
1241+
return nil, err
1242+
}
12411243
}
12421244

12431245
// Now register with mset and create the ack subscription.
@@ -5858,7 +5860,7 @@ func (o *consumer) hasSkipListPending() bool {
58585860
}
58595861

58605862
// Will select the starting sequence.
5861-
func (o *consumer) selectStartingSeqNo() {
5863+
func (o *consumer) selectStartingSeqNo() error {
58625864
if o.mset == nil || o.mset.store == nil {
58635865
o.sseq = 1
58645866
} else {
@@ -5873,7 +5875,10 @@ func (o *consumer) selectStartingSeqNo() {
58735875
} else {
58745876
// If we are partitioned here this will be properly set when we become leader.
58755877
for _, filter := range o.subjf {
5876-
ss := o.mset.store.FilteredState(1, filter.subject)
5878+
ss, err := o.mset.store.FilteredState(1, filter.subject)
5879+
if err != nil {
5880+
return err
5881+
}
58775882
if ss.Last > o.sseq {
58785883
o.sseq = ss.Last
58795884
}
@@ -5919,7 +5924,10 @@ func (o *consumer) selectStartingSeqNo() {
59195924
nseq := state.LastSeq
59205925
for _, filter := range o.subjf {
59215926
// Use first sequence since this is more optimized atm.
5922-
ss := o.mset.store.FilteredState(state.FirstSeq, filter.subject)
5927+
ss, err := o.mset.store.FilteredState(state.FirstSeq, filter.subject)
5928+
if err != nil {
5929+
return err
5930+
}
59235931
if ss.First >= o.sseq && ss.First < nseq {
59245932
nseq = ss.First
59255933
}
@@ -5961,8 +5969,11 @@ func (o *consumer) selectStartingSeqNo() {
59615969
// Set our starting sequence state.
59625970
// But only if we're not clustered, if clustered we propose upon becoming leader.
59635971
if o.store != nil && o.sseq > 0 && o.cfg.replicas(&o.mset.cfg) == 1 {
5964-
o.store.SetStarting(o.sseq - 1)
5972+
if err := o.store.SetStarting(o.sseq - 1); err != nil {
5973+
return err
5974+
}
59655975
}
5976+
return nil
59665977
}
59675978

59685979
// Test whether a config represents a durable subscriber.

0 commit comments

Comments
 (0)