Skip to content

Commit 61fbafc

Browse files
committed
Back up R1 replicated consumers in a clustered system
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent f802c99 commit 61fbafc

File tree

5 files changed

+213
-46
lines changed

5 files changed

+213
-46
lines changed

server/filestore.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13267,8 +13267,8 @@ func (fs *fileStore) RemoveConsumer(o ConsumerStore) error {
1326713267

1326813268
func (fs *fileStore) Consumers() iter.Seq[ConsumerStore] {
1326913269
return func(yield func(ConsumerStore) bool) {
13270-
fs.mu.RLock()
13271-
defer fs.mu.RUnlock()
13270+
fs.cmu.RLock()
13271+
defer fs.cmu.RUnlock()
1327213272

1327313273
for _, v := range fs.cfs {
1327413274
if !yield(v) {

server/jetstream_cluster_4_test.go

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7347,7 +7347,9 @@ func TestJetStreamClusterStreamSnapshots(t *testing.T) {
73477347

73487348
require_NoError(t, js.DeleteStream("test_stream"))
73497349
require_True(t, performStreamRestore(t, nc, cfg, state, archive))
7350+
73507351
c.waitOnAllCurrent()
7352+
c.waitOnStreamLeader(globalAccountName, "test_stream")
73517353

73527354
nsi, err := js.StreamInfo("test_stream")
73537355
require_NoError(t, err)
@@ -7379,12 +7381,12 @@ func TestJetStreamClusterStreamSnapshots(t *testing.T) {
73797381
Storage: storetype,
73807382
Replicas: 3,
73817383
})
7382-
for range 300 {
7384+
for range 5 * 10 {
73837385
_, err := js.Publish("foo.bar", nil)
73847386
require_NoError(t, err)
73857387
}
73867388
},
7387-
"LimitsWithConsumers": func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext) {
7389+
"LimitsWithR3Consumers": func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext) {
73887390
jsStreamCreate(t, nc, &StreamConfig{
73897391
Name: "test_stream",
73907392
Subjects: []string{"foo.>"},
@@ -7393,13 +7395,14 @@ func TestJetStreamClusterStreamSnapshots(t *testing.T) {
73937395
})
73947396
for n := range 5 {
73957397
_, err := js.AddConsumer("test_stream", &nats.ConsumerConfig{
7396-
Name: fmt.Sprintf("consumer_%d", n),
7397-
AckPolicy: nats.AckExplicitPolicy,
7398-
Replicas: 3,
7398+
Name: fmt.Sprintf("consumer_%d", n),
7399+
AckPolicy: nats.AckExplicitPolicy,
7400+
MemoryStorage: storetype == MemoryStorage,
7401+
Replicas: 3,
73997402
})
74007403
require_NoError(t, err)
74017404
}
7402-
for range 300 {
7405+
for range 5 * 10 {
74037406
_, err := js.Publish("foo.bar", nil)
74047407
require_NoError(t, err)
74057408
}
@@ -7415,6 +7418,38 @@ func TestJetStreamClusterStreamSnapshots(t *testing.T) {
74157418
}
74167419
}
74177420
},
7421+
"LimitsWithR1Consumers": func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext) {
7422+
jsStreamCreate(t, nc, &StreamConfig{
7423+
Name: "test_stream",
7424+
Subjects: []string{"foo.>"},
7425+
Storage: storetype,
7426+
Replicas: 3,
7427+
})
7428+
for n := range 20 {
7429+
_, err := js.AddConsumer("test_stream", &nats.ConsumerConfig{
7430+
Name: fmt.Sprintf("consumer_%d", n),
7431+
AckPolicy: nats.AckExplicitPolicy,
7432+
MemoryStorage: storetype == MemoryStorage,
7433+
Replicas: 1,
7434+
})
7435+
require_NoError(t, err)
7436+
}
7437+
for range 20 * 10 {
7438+
_, err := js.Publish("foo.bar", nil)
7439+
require_NoError(t, err)
7440+
}
7441+
for n := range 20 {
7442+
cn := fmt.Sprintf("consumer_%d", n)
7443+
sub, err := js.PullSubscribe(_EMPTY_, _EMPTY_, nats.Bind("test_stream", cn))
7444+
require_NoError(t, err)
7445+
for range n * 10 {
7446+
msgs, err := sub.Fetch(1)
7447+
require_NoError(t, err)
7448+
require_Len(t, len(msgs), 1)
7449+
require_NoError(t, msgs[0].AckSync())
7450+
}
7451+
}
7452+
},
74187453
"InterestNoConsumers": func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext) {
74197454
jsStreamCreate(t, nc, &StreamConfig{
74207455
Name: "test_stream",
@@ -7433,7 +7468,7 @@ func TestJetStreamClusterStreamSnapshots(t *testing.T) {
74337468
require_Equal(t, si.State.Msgs, 0)
74347469
require_Equal(t, si.State.FirstSeq, 301)
74357470
},
7436-
"InterestWithConsumers": func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext) {
7471+
"InterestWithR3Consumers": func(t *testing.T, nc *nats.Conn, js nats.JetStreamContext) {
74377472
jsStreamCreate(t, nc, &StreamConfig{
74387473
Name: "test_stream",
74397474
Subjects: []string{"foo.>"},
@@ -7446,6 +7481,7 @@ func TestJetStreamClusterStreamSnapshots(t *testing.T) {
74467481
Name: fmt.Sprintf("consumer_%d", n),
74477482
AckPolicy: nats.AckExplicitPolicy,
74487483
FilterSubjects: []string{"foo.>"},
7484+
MemoryStorage: storetype == MemoryStorage,
74497485
Replicas: 3,
74507486
})
74517487
require_NoError(t, err)

server/jetstream_test.go

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3198,8 +3198,6 @@ func TestJetStreamSnapshots(t *testing.T) {
31983198

31993199
require_True(t, performStreamRestore(t, nc2, sc, ss, snapshot))
32003200

3201-
// r.Reset(snapshot)
3202-
32033201
mset, err = acc.lookupStream("MY-STREAM")
32043202
require_NoError(t, err)
32053203

@@ -22656,3 +22654,39 @@ func TestJetStreamFlowControlCrossAccountFanOut(t *testing.T) {
2265622654
}
2265722655
}
2265822656
}
22657+
22658+
func TestJetStreamSnapshotAPIMessagePreamble(t *testing.T) {
22659+
hdr := []byte("NATS/1.0\r\nNats-Msg-Id: X\r\n\r\n")
22660+
msg := []byte("hello world")
22661+
subj := "foo.bar"
22662+
preamble := fmt.Sprintf("%d %d %s\r\n", len(hdr), len(subj), subj)
22663+
22664+
body := append(hdr, msg...)
22665+
r := strings.NewReader(preamble + string(body))
22666+
22667+
gotSubj, hlen, err := parseSnapshotMessagePreamble(r)
22668+
require_NoError(t, err)
22669+
require_Equal(t, gotSubj, subj)
22670+
require_Equal(t, hlen, len(hdr))
22671+
}
22672+
22673+
func TestJetStreamSnapshotAPIMessagePreambleAllowsEmptySubject(t *testing.T) {
22674+
hdr := []byte("NATS/1.0\r\n\r\n")
22675+
msg := []byte("msg")
22676+
subj := ""
22677+
preamble := fmt.Sprintf("%d %d %s\r\n", len(hdr), len(subj), subj)
22678+
22679+
body := append(hdr, msg...)
22680+
r := strings.NewReader(preamble + string(body))
22681+
22682+
gotSubj, hlen, err := parseSnapshotMessagePreamble(r)
22683+
require_NoError(t, err)
22684+
require_Equal(t, gotSubj, subj)
22685+
require_Equal(t, hlen, len(hdr))
22686+
}
22687+
22688+
func TestJetStreamSnapshotAPIMessagePreambleRejectsMalformedPreamble(t *testing.T) {
22689+
r := strings.NewReader("10 3 fooX\r\nabc")
22690+
_, _, err := parseSnapshotMessagePreamble(r)
22691+
require_Error(t, err)
22692+
}

server/stream.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8310,8 +8310,7 @@ func (mset *stream) snapshot(deadline time.Duration, checkMsgs, includeConsumers
83108310
return nil, errStreamClosed
83118311
}
83128312
store := mset.store
8313-
return CreateStreamSnapshotV2(store, deadline, includeConsumers)
8314-
// return store.Snapshot(deadline, checkMsgs, includeConsumers)
8313+
return mset.js.CreateStreamSnapshotV2(store, deadline, includeConsumers, mset.streamAssignment())
83158314
}
83168315

83178316
const snapsDir = "__snapshots__"

0 commit comments

Comments
 (0)