-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproducer.js
41 lines (37 loc) · 1.07 KB
/
producer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const { kafka } = require("./config/kakfaClient");
const readline = require("readline");
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
// producer task: send messages to the topic
async function init() {
const producer = kafka.producer();
console.log("Producer Connecting ...");
await producer.connect();
console.log("Producer Connected!");
// take input name and location , user location
rl.setPrompt("Enter Name and Location > ");
rl.prompt();
rl.on("line", async (line) => {
const [name, location] = line.split(" ");
if (!name || !location) {
return rl.prompt();
}
await producer.send({
topic: "rider-updated",
messages: [
{
partition: location.toLowerCase() === "south" ? 1 : 0,
key: "location-updated",
value: JSON.stringify({ name, location }),
},
],
});
console.log("Messages Sent!");
}).on("close", async () => {
await producer.disconnect();
console.log("Producer Disconnected!");
});
}
init().catch(console.error);