Skip to content

Commit 97a9542

Browse files
committed
[FIX] migrated 06_heartbeats.js to use the new event name as per jetstream migration docs
[CHANGE] changed the examples to be plain javascript, as this makes it easier to use out of the box. Signed-off-by: Alberto Ricart <alberto@synadia.com>
1 parent dc70480 commit 97a9542

14 files changed

+60
-38
lines changed

jetstream/README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -623,11 +623,12 @@ while (true) {
623623
// watch the to see if the consume operation misses heartbeats
624624
(async () => {
625625
for await (const s of await messages.status()) {
626-
if (s.type === ConsumerEvents.HeartbeatsMissed) {
626+
switch (s.type) {
627+
case "heartbeats_missed":
627628
// you can decide how many heartbeats you are willing to miss
628629
const n = s.data as number;
629-
console.log(`${n} heartbeats missed`);
630-
if (n === 2) {
630+
console.log(`${s.count} heartbeats missed`);
631+
if (s.count === 2) {
631632
// by calling `stop()` the message processing loop ends
632633
// in this case this is wrapped by a loop, so it attempts
633634
// to re-setup the consume
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
@@ -39,6 +39,6 @@ while (true) {
3939
m.ack();
4040
}
4141
} catch (err) {
42-
console.log(`consume failed: ${(err as Error).message}`);
42+
console.log(`consume failed: ${err.message}`);
4343
}
4444
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
*/
1515

1616
import { connect } from "@nats-io/transport-deno";
17-
import { ConsumerEvents, jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
17+
import { jetstream } from "@nats-io/jetstream";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
21-
const nc = await connect();
21+
const nc = await connect({ debug: true });
2222

2323
// create a stream with a random name with some messages and a consumer
2424
const { stream, consumer } = await setupStreamAndConsumer(nc);
@@ -33,16 +33,12 @@ while (true) {
3333
// watch the to see if the consume operation misses heartbeats
3434
(async () => {
3535
for await (const s of messages.status()) {
36-
if (s.type === ConsumerEvents.HeartbeatsMissed) {
37-
// you can decide how many heartbeats you are willing to miss
38-
const n = s.data as number;
39-
console.log(`${n} heartbeats missed`);
40-
if (n === 2) {
41-
// by calling `stop()` the message processing loop ends.
42-
// in this case this is wrapped by a loop, so it attempts
43-
// to re-setup the consume
44-
messages.stop();
45-
}
36+
switch (s.type) {
37+
case "heartbeats_missed":
38+
console.log(`${s.count} heartbeats missed`);
39+
if (s.count === 2) {
40+
messages.stop();
41+
}
4642
}
4743
}
4844
})();
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
import { connect, delay } from "@nats-io/transport-deno";
1717
import { SimpleMutex } from "@nats-io/nats-core/internal";
1818
import { jetstream } from "@nats-io/jetstream";
19-
import type { JsMsg } from "@nats-io/jetstream";
20-
import { setupStreamAndConsumer } from "./util.ts";
19+
import { setupStreamAndConsumer } from "./util.js";
2120

2221
// create a connection
2322
const nc = await connect();
@@ -37,7 +36,7 @@ const messages = await c.consume({ max_messages: 10 });
3736
// and then only allowing additional processing as others complete
3837
const rl = new SimpleMutex(5);
3938

40-
async function schedule(m: JsMsg): Promise<void> {
39+
async function schedule(m) {
4140
// pretend to do work
4241
await delay(1000);
4342
m.ack();
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
*/
1515

1616
import { connect } from "@nats-io/transport-deno";
17-
import { setupStreamAndConsumer } from "./util.ts";
17+
import { setupStreamAndConsumer } from "./util.js";
1818
import { jetstream } from "@nats-io/jetstream";
1919

2020
// create a connection
@@ -31,7 +31,7 @@ const c = await js.consumers.get(stream, consumer);
3131
// creating a frequency table based on the subjects found
3232
const messages = await c.consume();
3333

34-
const data = new Map<string, number>();
34+
const data = new Map();
3535
for await (const m of messages) {
3636
const chunks = m.subject.split(".");
3737
const v = data.get(chunks[1]) || 0;

jetstream/examples/js_readme_publish_examples.ts renamed to jetstream/examples/js_readme_publish_examples.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import { connect, Empty } from "@nats-io/transport-deno";
1717
import { jetstream, jetstreamManager } from "@nats-io/jetstream";
18-
import type { PubAck } from "../src/mod.ts";
1918

2019
const nc = await connect();
2120
const jsm = await jetstreamManager(nc);
@@ -52,7 +51,7 @@ pa = await js.publish("a.b", Empty, { expect: { streamName: "a" } });
5251

5352
// now if you have a stream with different subjects, you can also
5453
// assert that the last recorded sequence on subject on the stream matches
55-
const buf: Promise<PubAck>[] = [];
54+
const buf = [];
5655
for (let i = 0; i < 100; i++) {
5756
buf.push(js.publish("a.a", Empty));
5857
}

0 commit comments

Comments
 (0)