Skip to content

Commit 0690d20

Browse files
[FIXED] Recreated stream/consumer state is deleted during recovery
Signed-off-by: Maurice van Veen <[email protected]>
1 parent bb161ea commit 0690d20

File tree

11 files changed

+533
-67
lines changed

11 files changed

+533
-67
lines changed

server/consumer.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
963963
}
964964

965965
mset.mu.RLock()
966-
s, jsa, cfg, acc := mset.srv, mset.jsa, mset.cfg, mset.acc
966+
s, js, jsa, cfg, acc := mset.srv, mset.js, mset.jsa, mset.cfg, mset.acc
967967
mset.mu.RUnlock()
968968

969969
// If we do not have the consumer currently assigned to us in cluster mode we will proceed but warn.
@@ -1134,6 +1134,13 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
11341134
created: time.Now().UTC(),
11351135
}
11361136

1137+
// Add created timestamp used for the store, must match that of the consumer assignment if it exists.
1138+
if ca != nil {
1139+
js.mu.RLock()
1140+
o.created = ca.Created
1141+
js.mu.RUnlock()
1142+
}
1143+
11371144
// Bind internal client to the user account.
11381145
o.client.registerWithAccount(a)
11391146
// Bind to the system account.
@@ -1186,7 +1193,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
11861193

11871194
// Setup our storage if not a direct consumer.
11881195
if !config.Direct {
1189-
store, err := mset.store.ConsumerStore(o.name, config)
1196+
store, err := mset.store.ConsumerStore(o.name, o.created, config)
11901197
if err != nil {
11911198
mset.mu.Unlock()
11921199
o.deleteWithoutAdvisory()

server/filestore.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10903,7 +10903,7 @@ type consumerFileStore struct {
1090310903
closed bool
1090410904
}
1090510905

10906-
func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerStore, error) {
10906+
func (fs *fileStore) ConsumerStore(name string, created time.Time, cfg *ConsumerConfig) (ConsumerStore, error) {
1090710907
if fs == nil {
1090810908
return nil, fmt.Errorf("filestore is nil")
1090910909
}
@@ -10926,7 +10926,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
1092610926
if err := os.MkdirAll(odir, defaultDirPerms); err != nil {
1092710927
return nil, fmt.Errorf("could not create consumer directory - %v", err)
1092810928
}
10929-
csi := &FileConsumerInfo{Name: name, Created: time.Now().UTC(), ConsumerConfig: *cfg}
10929+
csi := &FileConsumerInfo{Name: name, Created: created, ConsumerConfig: *cfg}
1093010930
o := &consumerFileStore{
1093110931
fs: fs,
1093210932
cfg: csi,
@@ -10983,7 +10983,6 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
1098310983
meta := filepath.Join(odir, JetStreamMetaFile)
1098410984
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
1098510985
didCreate = true
10986-
csi.Created = time.Now().UTC()
1098710986
if err := o.writeConsumerMeta(); err != nil {
1098810987
os.RemoveAll(odir)
1098910988
return nil, err

server/filestore_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,7 +1460,7 @@ func TestFileStoreMeta(t *testing.T) {
14601460
AckPolicy: AckAll,
14611461
}
14621462
oname := "obs22"
1463-
obs, err := fs.ConsumerStore(oname, &oconfig)
1463+
obs, err := fs.ConsumerStore(oname, time.Time{}, &oconfig)
14641464
if err != nil {
14651465
t.Fatalf("Unexpected error: %v", err)
14661466
}
@@ -1809,11 +1809,11 @@ func TestFileStoreSnapshot(t *testing.T) {
18091809
}
18101810

18111811
// Create a few consumers.
1812-
o1, err := fs.ConsumerStore("o22", &ConsumerConfig{})
1812+
o1, err := fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{})
18131813
if err != nil {
18141814
t.Fatalf("Unexpected error: %v", err)
18151815
}
1816-
o2, err := fs.ConsumerStore("o33", &ConsumerConfig{})
1816+
o2, err := fs.ConsumerStore("o33", time.Time{}, &ConsumerConfig{})
18171817
if err != nil {
18181818
t.Fatalf("Unexpected error: %v", err)
18191819
}
@@ -1977,7 +1977,7 @@ func TestFileStoreConsumer(t *testing.T) {
19771977
require_NoError(t, err)
19781978
defer fs.Stop()
19791979

1980-
o, err := fs.ConsumerStore("obs22", &ConsumerConfig{})
1980+
o, err := fs.ConsumerStore("obs22", time.Time{}, &ConsumerConfig{})
19811981
if err != nil {
19821982
t.Fatalf("Unexpected error: %v", err)
19831983
}
@@ -2504,7 +2504,7 @@ func TestFileStoreConsumerRedeliveredLost(t *testing.T) {
25042504
defer fs.Stop()
25052505

25062506
cfg := &ConsumerConfig{AckPolicy: AckExplicit}
2507-
o, err := fs.ConsumerStore("o22", cfg)
2507+
o, err := fs.ConsumerStore("o22", time.Time{}, cfg)
25082508
if err != nil {
25092509
t.Fatalf("Unexpected error: %v", err)
25102510
}
@@ -2513,7 +2513,7 @@ func TestFileStoreConsumerRedeliveredLost(t *testing.T) {
25132513
t.Helper()
25142514
o.Stop()
25152515
time.Sleep(20 * time.Millisecond) // Wait for all things to settle.
2516-
o, err = fs.ConsumerStore("o22", cfg)
2516+
o, err = fs.ConsumerStore("o22", time.Time{}, cfg)
25172517
if err != nil {
25182518
t.Fatalf("Unexpected error: %v", err)
25192519
}
@@ -2569,7 +2569,7 @@ func TestFileStoreConsumerFlusher(t *testing.T) {
25692569
require_NoError(t, err)
25702570
defer fs.Stop()
25712571

2572-
o, err := fs.ConsumerStore("o22", &ConsumerConfig{})
2572+
o, err := fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{})
25732573
if err != nil {
25742574
t.Fatalf("Unexpected error: %v", err)
25752575
}
@@ -2601,7 +2601,7 @@ func TestFileStoreConsumerDeliveredUpdates(t *testing.T) {
26012601
defer fs.Stop()
26022602

26032603
// Simple consumer, no ack policy configured.
2604-
o, err := fs.ConsumerStore("o22", &ConsumerConfig{})
2604+
o, err := fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{})
26052605
if err != nil {
26062606
t.Fatalf("Unexpected error: %v", err)
26072607
}
@@ -2655,7 +2655,7 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) {
26552655
defer fs.Stop()
26562656

26572657
// Simple consumer, no ack policy configured.
2658-
o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
2658+
o, err := fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{AckPolicy: AckExplicit})
26592659
if err != nil {
26602660
t.Fatalf("Unexpected error: %v", err)
26612661
}
@@ -2745,7 +2745,7 @@ func TestFileStoreConsumerDeliveredAndAckUpdates(t *testing.T) {
27452745
}
27462746
o.Stop()
27472747

2748-
o, err = fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
2748+
o, err = fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{AckPolicy: AckExplicit})
27492749
if err != nil {
27502750
t.Fatalf("Unexpected error: %v", err)
27512751
}
@@ -2847,7 +2847,7 @@ func TestFileStoreConsumerPerf(t *testing.T) {
28472847
require_NoError(t, err)
28482848
defer fs.Stop()
28492849

2850-
o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
2850+
o, err := fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{AckPolicy: AckExplicit})
28512851
if err != nil {
28522852
t.Fatalf("Unexpected error: %v", err)
28532853
}
@@ -4171,7 +4171,7 @@ func TestFileStoreEncrypted(t *testing.T) {
41714171
fs.StoreMsg(subj, nil, msg, 0)
41724172
}
41734173

4174-
o, err := fs.ConsumerStore("o22", &ConsumerConfig{})
4174+
o, err := fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{})
41754175
require_NoError(t, err)
41764176

41774177
state := &ConsumerState{}
@@ -4194,7 +4194,7 @@ func TestFileStoreEncrypted(t *testing.T) {
41944194
require_NoError(t, err)
41954195
require_True(t, string(sm.msg) == "aes ftw")
41964196

4197-
o, err = fs.ConsumerStore("o22", &ConsumerConfig{})
4197+
o, err = fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{})
41984198
require_NoError(t, err)
41994199
rstate, err := o.State()
42004200
require_NoError(t, err)
@@ -4990,7 +4990,7 @@ func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) {
49904990
require_NoError(t, err)
49914991
defer fs.Stop()
49924992

4993-
o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
4993+
o, err := fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{AckPolicy: AckExplicit})
49944994
require_NoError(t, err)
49954995
defer o.Stop()
49964996

@@ -5007,7 +5007,7 @@ func TestFileStoreConsumerStoreEncodeAfterRestart(t *testing.T) {
50075007
require_NoError(t, err)
50085008
defer fs.Stop()
50095009

5010-
o, err := fs.ConsumerStore("o22", &ConsumerConfig{AckPolicy: AckExplicit})
5010+
o, err := fs.ConsumerStore("o22", time.Time{}, &ConsumerConfig{AckPolicy: AckExplicit})
50115011
require_NoError(t, err)
50125012
defer o.Stop()
50135013

@@ -8366,7 +8366,7 @@ func Benchmark_FileStoreCreateConsumerStores(b *testing.B) {
83668366
b.ResetTimer()
83678367
for i := 0; i < b.N; i++ {
83688368
oname := fmt.Sprintf("obs22_%d", i)
8369-
ofs, err := fs.ConsumerStore(oname, &oconfig)
8369+
ofs, err := fs.ConsumerStore(oname, time.Time{}, &oconfig)
83708370
require_NoError(b, err)
83718371
require_NoError(b, ofs.Stop())
83728372
}

server/jetstream_api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2821,11 +2821,11 @@ func (s *Server) jsLeaderAccountPurgeRequest(sub *subscription, c *client, _ *Ac
28212821
for _, osa := range streams {
28222822
for _, oca := range osa.consumers {
28232823
oca.deleted = true
2824-
ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client}
2824+
ca := &consumerAssignment{Group: oca.Group, Stream: oca.Stream, Name: oca.Name, Config: oca.Config, Subject: subject, Client: oca.Client, Created: oca.Created}
28252825
meta.Propose(encodeDeleteConsumerAssignment(ca))
28262826
nc++
28272827
}
2828-
sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client}
2828+
sa := &streamAssignment{Group: osa.Group, Config: osa.Config, Subject: subject, Client: osa.Client, Created: osa.Created}
28292829
meta.Propose(encodeDeleteStreamAssignment(sa))
28302830
ns++
28312831
}

0 commit comments

Comments
 (0)