Skip to content

Commit 1a88f79

Browse files
committed
[FIX] migrated 06_heartbeats.js to use the new event name as per jetstream migration docs
[CHANGE] changed the examples to be plain javascript, as this makes it easier to use out of the box. Signed-off-by: Alberto Ricart <[email protected]>
1 parent dc70480 commit 1a88f79

14 files changed

+70
-46
lines changed

jetstream/README.md

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -623,16 +623,17 @@ while (true) {
623623
// watch the to see if the consume operation misses heartbeats
624624
(async () => {
625625
for await (const s of await messages.status()) {
626-
if (s.type === ConsumerEvents.HeartbeatsMissed) {
627-
// you can decide how many heartbeats you are willing to miss
628-
const n = s.data as number;
629-
console.log(`${n} heartbeats missed`);
630-
if (n === 2) {
631-
// by calling `stop()` the message processing loop ends
632-
// in this case this is wrapped by a loop, so it attempts
633-
// to re-setup the consume
634-
messages.stop();
635-
}
626+
switch (s.type) {
627+
case "heartbeats_missed":
628+
// you can decide how many heartbeats you are willing to miss
629+
const n = s.data as number;
630+
console.log(`${s.count} heartbeats missed`);
631+
if (s.count === 2) {
632+
// by calling `stop()` the message processing loop ends
633+
// in this case this is wrapped by a loop, so it attempts
634+
// to re-setup the consume
635+
messages.stop();
636+
}
636637
}
637638
}
638639
})();
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
@@ -39,6 +39,6 @@ while (true) {
3939
m.ack();
4040
}
4141
} catch (err) {
42-
console.log(`consume failed: ${(err as Error).message}`);
42+
console.log(`consume failed: ${err.message}`);
4343
}
4444
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import { connect } from "@nats-io/transport-deno";
1717
import { jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
2121
const nc = await connect();
Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414
*/
1515

1616
import { connect } from "@nats-io/transport-deno";
17-
import { ConsumerEvents, jetstream } from "@nats-io/jetstream";
18-
import { setupStreamAndConsumer } from "./util.ts";
17+
import { jetstream } from "@nats-io/jetstream";
18+
import { setupStreamAndConsumer } from "./util.js";
1919

2020
// create a connection
21-
const nc = await connect();
21+
const nc = await connect({ debug: true });
2222

2323
// create a stream with a random name with some messages and a consumer
2424
const { stream, consumer } = await setupStreamAndConsumer(nc);
@@ -28,21 +28,19 @@ const js = jetstream(nc);
2828

2929
const c = await js.consumers.get(stream, consumer);
3030
while (true) {
31-
const messages = await c.consume({ max_messages: 1 });
31+
const messages = await c.consume({
32+
max_messages: 1,
33+
});
3234

3335
// watch the to see if the consume operation misses heartbeats
3436
(async () => {
3537
for await (const s of messages.status()) {
36-
if (s.type === ConsumerEvents.HeartbeatsMissed) {
37-
// you can decide how many heartbeats you are willing to miss
38-
const n = s.data as number;
39-
console.log(`${n} heartbeats missed`);
40-
if (n === 2) {
41-
// by calling `stop()` the message processing loop ends.
42-
// in this case this is wrapped by a loop, so it attempts
43-
// to re-setup the consume
44-
messages.stop();
45-
}
38+
switch (s.type) {
39+
case "heartbeats_missed":
40+
console.log(`${s.count} heartbeats missed`);
41+
if (s.count === 2) {
42+
messages.stop();
43+
}
4644
}
4745
}
4846
})();
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616
import { connect, delay } from "@nats-io/transport-deno";
1717
import { SimpleMutex } from "@nats-io/nats-core/internal";
1818
import { jetstream } from "@nats-io/jetstream";
19-
import type { JsMsg } from "@nats-io/jetstream";
20-
import { setupStreamAndConsumer } from "./util.ts";
19+
import { setupStreamAndConsumer } from "./util.js";
2120

2221
// create a connection
2322
const nc = await connect();
@@ -37,7 +36,7 @@ const messages = await c.consume({ max_messages: 10 });
3736
// and then only allowing additional processing as others complete
3837
const rl = new SimpleMutex(5);
3938

40-
async function schedule(m: JsMsg): Promise<void> {
39+
async function schedule(m) {
4140
// pretend to do work
4241
await delay(1000);
4342
m.ack();
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
*/
1515

1616
import { connect } from "@nats-io/transport-deno";
17-
import { setupStreamAndConsumer } from "./util.ts";
17+
import { setupStreamAndConsumer } from "./util.js";
1818
import { jetstream } from "@nats-io/jetstream";
1919

2020
// create a connection
@@ -31,7 +31,7 @@ const c = await js.consumers.get(stream, consumer);
3131
// creating a frequency table based on the subjects found
3232
const messages = await c.consume();
3333

34-
const data = new Map<string, number>();
34+
const data = new Map();
3535
for await (const m of messages) {
3636
const chunks = m.subject.split(".");
3737
const v = data.get(chunks[1]) || 0;

jetstream/examples/js_readme_publish_examples.ts renamed to jetstream/examples/js_readme_publish_examples.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import { connect, Empty } from "@nats-io/transport-deno";
1717
import { jetstream, jetstreamManager } from "@nats-io/jetstream";
18-
import type { PubAck } from "../src/mod.ts";
1918

2019
const nc = await connect();
2120
const jsm = await jetstreamManager(nc);
@@ -52,7 +51,7 @@ pa = await js.publish("a.b", Empty, { expect: { streamName: "a" } });
5251

5352
// now if you have a stream with different subjects, you can also
5453
// assert that the last recorded sequence on subject on the stream matches
55-
const buf: Promise<PubAck>[] = [];
54+
const buf = [];
5655
for (let i = 0; i < 100; i++) {
5756
buf.push(js.publish("a.a", Empty));
5857
}

0 commit comments

Comments
 (0)