Skip to content

Commit f802c99

Browse files
committed
Updates to unit tests
Signed-off-by: Neil Twigg <neil@nats.io>
1 parent b90e913 commit f802c99

File tree

5 files changed

+100
-205
lines changed

5 files changed

+100
-205
lines changed

server/jetstream_cluster_3_test.go

Lines changed: 7 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -4708,43 +4708,8 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
47084708
t.Fatalf("Did not receive completion signal")
47094709
}
47104710

4711-
sreq := &JSApiStreamSnapshotRequest{
4712-
DeliverSubject: nats.NewInbox(),
4713-
ChunkSize: 512,
4714-
}
4715-
req, _ := json.Marshal(sreq)
4716-
rmsg, err := nc.Request(fmt.Sprintf(JSApiStreamSnapshotT, "TEST"), req, time.Second)
4717-
require_NoError(t, err)
4718-
4719-
var resp JSApiStreamSnapshotResponse
4720-
json.Unmarshal(rmsg.Data, &resp)
4721-
require_True(t, resp.Error == nil)
4722-
4723-
state := *resp.State
4724-
cfg := *resp.Config
4725-
4726-
var snapshot []byte
4727-
done := make(chan bool)
4728-
4729-
sub, _ := nc.Subscribe(sreq.DeliverSubject, func(m *nats.Msg) {
4730-
// EOF
4731-
if len(m.Data) == 0 {
4732-
done <- true
4733-
return
4734-
}
4735-
// Could be writing to a file here too.
4736-
snapshot = append(snapshot, m.Data...)
4737-
// Flow ack
4738-
m.Respond(nil)
4739-
})
4740-
defer sub.Unsubscribe()
4741-
4742-
// Wait to receive the snapshot.
4743-
select {
4744-
case <-done:
4745-
case <-time.After(5 * time.Second):
4746-
t.Fatalf("Did not receive our snapshot in time")
4747-
}
4711+
// Take a backup of the stream.
4712+
sc, ss, snapshot := performStreamBackup(t, nc, "TEST")
47484713

47494714
// Delete before we try to restore.
47504715
require_NoError(t, js.DeleteStream("TEST"))
@@ -4765,53 +4730,15 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
47654730
})
47664731
}
47674732

4768-
var rresp JSApiStreamRestoreResponse
4769-
rreq := &JSApiStreamRestoreRequest{
4770-
Config: cfg,
4771-
State: state,
4772-
}
4773-
req, _ = json.Marshal(rreq)
4774-
4775-
rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "TEST"), req, 5*time.Second)
4776-
require_NoError(t, err)
4777-
4778-
rresp.Error = nil
4779-
json.Unmarshal(rmsg.Data, &rresp)
4780-
require_True(t, rresp.Error == nil)
4781-
4782-
checkHealth()
4783-
4784-
// We will now chunk the snapshot responses (and EOF).
4785-
var chunk [1024]byte
4786-
for i, r := 0, bytes.NewReader(snapshot); ; {
4787-
n, err := r.Read(chunk[:])
4788-
if err != nil {
4789-
break
4790-
}
4791-
nc.Request(rresp.DeliverSubject, chunk[:n], time.Second)
4792-
i++
4793-
// We will call healthz for all servers halfway through the restore.
4794-
if i%100 == 0 {
4795-
checkHealth()
4796-
}
4797-
}
4798-
rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second)
4799-
require_NoError(t, err)
4800-
rresp.Error = nil
4801-
json.Unmarshal(rmsg.Data, &rresp)
4802-
require_True(t, rresp.Error == nil)
4803-
4804-
si, err := js.StreamInfo("TEST")
4805-
require_NoError(t, err)
4806-
require_True(t, si.State.Msgs == uint64(toSend))
4807-
4733+
// Restore the backup.
4734+
require_True(t, performStreamRestore(t, nc, sc, ss, snapshot))
48084735
checkHealth()
48094736

48104737
// Make sure stepdown works, this would fail before the fix.
48114738
_, err = nc.Request(fmt.Sprintf(JSApiStreamLeaderStepDownT, "TEST"), nil, 5*time.Second)
48124739
require_NoError(t, err)
48134740

4814-
si, err = js.StreamInfo("TEST")
4741+
si, err := js.StreamInfo("TEST")
48154742
require_NoError(t, err)
48164743
require_True(t, si.State.Msgs == uint64(toSend))
48174744

@@ -4824,29 +4751,8 @@ func TestJetStreamClusterSnapshotAndRestoreWithHealthz(t *testing.T) {
48244751
nc, _ = jsClientConnect(t, s)
48254752
defer nc.Close()
48264753

4827-
rmsg, err = nc.Request(fmt.Sprintf(JSApiStreamRestoreT, "TEST"), req, 5*time.Second)
4828-
require_NoError(t, err)
4829-
4830-
rresp.Error = nil
4831-
json.Unmarshal(rmsg.Data, &rresp)
4832-
require_True(t, rresp.Error == nil)
4833-
4834-
for i, r := 0, bytes.NewReader(snapshot); ; {
4835-
n, err := r.Read(chunk[:])
4836-
if err != nil {
4837-
break
4838-
}
4839-
_, err = nc.Request(rresp.DeliverSubject, chunk[:n], time.Second)
4840-
require_NoError(t, err)
4841-
i++
4842-
}
4843-
rmsg, err = nc.Request(rresp.DeliverSubject, nil, time.Second)
4844-
require_NoError(t, err)
4845-
rresp.Error = nil
4846-
json.Unmarshal(rmsg.Data, &rresp)
4847-
4848-
require_True(t, rresp.Error != nil)
4849-
require_Equal(t, rresp.ApiResponse.Error.ErrCode, 10074)
4754+
// Restore the backup.
4755+
require_True(t, performStreamRestore(t, nc, sc, ss, snapshot))
48504756

48514757
status := s.healthz(nil)
48524758
require_Equal(t, status.StatusCode, 200)

server/jetstream_cluster_4_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7346,7 +7346,7 @@ func TestJetStreamClusterStreamSnapshots(t *testing.T) {
73467346
require_NoError(t, err)
73477347

73487348
require_NoError(t, js.DeleteStream("test_stream"))
7349-
performStreamRestore(t, nc, cfg, state, archive)
7349+
require_True(t, performStreamRestore(t, nc, cfg, state, archive))
73507350
c.waitOnAllCurrent()
73517351

73527352
nsi, err := js.StreamInfo("test_stream")

server/jetstream_helpers_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2272,7 +2272,7 @@ func performStreamBackup(t *testing.T, nc *nats.Conn, streamName string) (Stream
22722272
return body.StreamConfig, body.StreamState, buf.Bytes()
22732273
}
22742274

2275-
func performStreamRestore(t *testing.T, nc *nats.Conn, sc StreamConfig, ss StreamState, archive []byte) {
2275+
func performStreamRestore(t *testing.T, nc *nats.Conn, sc StreamConfig, ss StreamState, archive []byte) bool {
22762276
t.Helper()
22772277
endpoint := fmt.Sprintf(JSApiStreamRestoreT, sc.Name)
22782278

@@ -2295,6 +2295,9 @@ func performStreamRestore(t *testing.T, nc *nats.Conn, sc StreamConfig, ss Strea
22952295
DeliverSubject string `json:"deliver_subject"`
22962296
}
22972297
require_NoError(t, json.Unmarshal(resp.Data, &res))
2298+
if !IsValidLiteralSubject(res.DeliverSubject) {
2299+
return false
2300+
}
22982301

22992302
reader := bytes.NewReader(archive)
23002303
buf := make([]byte, 128*1024)
@@ -2311,4 +2314,5 @@ func performStreamRestore(t *testing.T, nc *nats.Conn, sc StreamConfig, ss Strea
23112314

23122315
_, err = nc.Request(res.DeliverSubject, nil, time.Second)
23132316
require_NoError(t, err)
2317+
return true
23142318
}

0 commit comments

Comments
 (0)