Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"errors"
"fmt"
"maps"
"math"
"math/rand"
"os"
Expand Down Expand Up @@ -139,6 +140,25 @@ type ConsumerConfig struct {
PinnedTTL time.Duration `json:"priority_timeout,omitempty"`
}

// clone performs a deep copy of the ConsumerConfig struct, returning a new clone with
// all values copied.
func (cfg *ConsumerConfig) clone() *ConsumerConfig {
clone := *cfg
if cfg.BackOff != nil {
clone.BackOff = slices.Clone(cfg.BackOff)
}
if cfg.FilterSubjects != nil {
clone.FilterSubjects = slices.Clone(cfg.FilterSubjects)
}
if cfg.Metadata != nil {
clone.Metadata = maps.Clone(cfg.Metadata)
}
if cfg.PriorityGroups != nil {
clone.PriorityGroups = slices.Clone(cfg.PriorityGroups)
}
return &clone
}

// SequenceInfo has both the consumer and the stream sequence and last activity.
type SequenceInfo struct {
Consumer uint64 `json:"consumer_seq"`
Expand Down
22 changes: 22 additions & 0 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
"io"
"io/fs"
"iter"
"math"
mrand "math/rand"
"net"
Expand Down Expand Up @@ -12639,6 +12640,14 @@ func (o *consumerFileStore) encodeState() ([]byte, error) {
return encodeConsumerState(state), nil
}

func (o *consumerFileStore) GetConfig() *ConsumerConfig {
o.mu.Lock()
defer o.mu.Unlock()
clone := o.cfg.clone()
clone.Name = o.name
return clone
}

func (o *consumerFileStore) UpdateConfig(cfg *ConsumerConfig) error {
o.mu.Lock()
defer o.mu.Unlock()
Expand Down Expand Up @@ -13256,6 +13265,19 @@ func (fs *fileStore) RemoveConsumer(o ConsumerStore) error {
return nil
}

func (fs *fileStore) Consumers() iter.Seq[ConsumerStore] {
return func(yield func(ConsumerStore) bool) {
fs.cmu.RLock()
defer fs.cmu.RUnlock()

for _, v := range fs.cfs {
if !yield(v) {
return
}
}
}
}

////////////////////////////////////////////////////////////////////////////////
// Compression
////////////////////////////////////////////////////////////////////////////////
Expand Down
55 changes: 47 additions & 8 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"archive/tar"
"bytes"
"cmp"
"encoding/json"
Expand All @@ -29,6 +30,7 @@ import (
"time"
"unicode"

"github.com/klauspost/compress/s2"
"github.com/nats-io/nuid"
)

Expand Down Expand Up @@ -3973,9 +3975,36 @@ func (s *Server) processStreamRestore(ci *ClientInfo, acc *Account, cfg *StreamC
// If we staged properly go ahead and do restore now.
if err == nil {
s.Debugf("Finalizing restore for stream '%s > %s'", acc.Name, streamName)
tfile.Seek(0, 0)
mset, err = acc.RestoreStream(cfg, tfile)
} else {
_, err = tfile.Seek(0, 0)
}
// Determine the snapshot format:
// - v1: First file in the tar will be meta.inf
// - v2: First file in the tar will be state.json
var v2 bool
if err == nil {
tr := tar.NewReader(s2.NewReader(tfile))
var th *tar.Header
if th, err = tr.Next(); err == nil && th != nil {
switch th.Name {
case "meta.inf":
case "state.json":
v2 = true
default:
err = fmt.Errorf("unknown snapshot version")
}
}
}
if err == nil {
_, err = tfile.Seek(0, 0)
}
if err == nil {
if v2 {
mset, err = acc.RestoreStreamV2(cfg, tfile)
} else {
mset, err = acc.RestoreStream(cfg, tfile)
}
}
if err != nil {
errStr := err.Error()
tmp := []rune(errStr)
tmp[0] = unicode.ToUpper(tmp[0])
Expand Down Expand Up @@ -4156,11 +4185,21 @@ func (s *Server) jsStreamSnapshotRequest(sub *subscription, c *client, _ *Accoun
Domain: s.getOpts().JetStreamDomain,
})

s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
friendlyBytes(int64(sr.State.Bytes)),
mset.jsa.account.Name,
mset.name(),
end.Sub(start))
if err := <-sr.errCh; err != _EMPTY_ {
s.Warnf("Snapshot for stream '%s > %s' failed after %v: %s",
mset.jsa.account.Name,
mset.name(),
end.Sub(start),
err,
)
} else {
s.Noticef("Completed snapshot of %s for stream '%s > %s' in %v",
friendlyBytes(int64(sr.State.Bytes)),
mset.jsa.account.Name,
mset.name(),
end.Sub(start),
)
}
}()
}

Expand Down
108 changes: 7 additions & 101 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4708,43 +4708,8 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
t.Fatalf("Did not receive completion signal")
}

sreq := &JSApiStreamSnapshotRequest{
DeliverSubject: nats.NewInbox(),
ChunkSize: 512,
}
req, _ := json.Marshal(sreq)
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamSnapshotT, "TEST"), req, time.Second)
require_NoError(t, err)

var resp JSApiStreamSnapshotResponse
json.Unmarshal(rmsg.Data, &resp)
require_True(t, resp.Error == nil)

state := *resp.State
cfg := *resp.Config

var snapshot []byte
done := make(chan bool)

sub, _ := nc.Subscribe(sreq.DeliverSubject, func(m *nats.Msg) {
// EOF
if len(m.Data) == 0 {
done <- true
return
}
// Could be writing to a file here too.
snapshot = append(snapshot, m.Data...)
// Flow ack
m.Respond(nil)
})
defer sub.Unsubscribe()

// Wait to receive the snapshot.
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive our snapshot in time")
}
// Take a backup of the stream.
sc, ss, snapshot := performStreamBackup(t, nc, "TEST")

// Delete before we try to restore.
require_NoError(t, js.DeleteStream("TEST"))
Expand All @@ -4765,53 +4730,15 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
})
}

var rresp JSApiStreamRestoreResponse
rreq := &JSApiStreamRestoreRequest{
Config: cfg,
State: state,
}
req, _ = json.Marshal(rreq)

rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "TEST"), req, 5*time.Second)
require_NoError(t, err)

rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
require_True(t, rresp.Error == nil)

checkHealth()

// We will now chunk the snapshot responses (and EOF).
var chunk [1024]byte
for i, r := 0, bytes.NewReader(snapshot); ; {
n, err := r.Read(chunk[:])
if err != nil {
break
}
nc.Request(rresp.DeliverSubject, chunk[:n], time.Second)
i++
// We will call healthz for all servers halfway through the restore.
if i%100 == 0 {
checkHealth()
}
}
rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second)
require_NoError(t, err)
rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
require_True(t, rresp.Error == nil)

si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == uint64(toSend))

// Restore the backup.
require_True(t, performStreamRestore(t, nc, sc, ss, snapshot))
checkHealth()

// Make sure stepdown works, this would fail before the fix.
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, 5*time.Second)
require_NoError(t, err)

si, err = js.StreamInfo("TEST")
si, err := js.StreamInfo("TEST")
require_NoError(t, err)
require_True(t, si.State.Msgs == uint64(toSend))

Expand All @@ -4824,29 +4751,8 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
nc, _ = jsClientConnect(t, s)
defer nc.Close()

rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "TEST"), req, 5*time.Second)
require_NoError(t, err)

rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)
require_True(t, rresp.Error == nil)

for i, r := 0, bytes.NewReader(snapshot); ; {
n, err := r.Read(chunk[:])
if err != nil {
break
}
_, err = nc.Request(rresp.DeliverSubject, chunk[:n], time.Second)
require_NoError(t, err)
i++
}
rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second)
require_NoError(t, err)
rresp.Error = nil
json.Unmarshal(rmsg.Data, &rresp)

require_True(t, rresp.Error != nil)
require_Equal(t, rresp.ApiResponse.Error.ErrCode, 10074)
// Restore the backup.
require_True(t, performStreamRestore(t, nc, sc, ss, snapshot))

status := s.healthz(nil)
require_Equal(t, status.StatusCode, 200)
Expand Down
Loading
Loading