Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions consumer/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,43 @@
import Kafka from 'node-rdkafka';
import { Kafka } from 'kafkajs';
import eventType from '../eventType.js';

var consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
}, {});
// KafkaJS-based consumer replacing node-rdkafka
const kafka = new Kafka({
clientId: 'node-kafka-consumer',
brokers: ['localhost:9092'],
});

const consumer = kafka.consumer({ groupId: 'kafka' });

async function start() {
try {
await consumer.connect();
await consumer.subscribe({ topic: 'test', fromBeginning: true });
console.log('consumer ready..');

await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
try {
const decoded = eventType.fromBuffer(message.value);
console.log(`received message: ${decoded}`);
} catch (err) {
console.error('Failed to decode message', err);
}
},
});
} catch (err) {
console.error('Consumer startup failed', err);
process.exit(1);
}
}

consumer.connect();
start();

consumer.on('ready', () => {
console.log('consumer ready..')
consumer.subscribe(['test']);
consumer.consume();
}).on('data', function(data) {
console.log(`received message: ${eventType.fromBuffer(data.value)}`);
// Graceful shutdown (optional but helpful during dev)
process.on('SIGINT', async () => {
try {
await consumer.disconnect();
} finally {
process.exit(0);
}
});
32 changes: 18 additions & 14 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
version: "3"
version: "3.8"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
image: confluentinc/cp-zookeeper:7.6.0
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

kafka:
image: 'bitnami/kafka:latest'
container_name: 'kafka'
image: confluentinc/cp-kafka:7.6.0
container_name: kafka
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_LISTENERS=PLAINTEXT://:9092
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
"author": "",
"license": "ISC",
"dependencies": {
"avsc": "^5.6.2",
"node-rdkafka": "^2.10.1"
"kafkajs": "^2.2.4",
"avsc": "^5.7.9"
}
}
52 changes: 34 additions & 18 deletions producer/index.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
import Kafka from 'node-rdkafka';
import { Kafka } from 'kafkajs';
import eventType from '../eventType.js';

const stream = Kafka.Producer.createWriteStream({
'metadata.broker.list': 'localhost:9092'
}, {}, {
topic: 'test'
// KafkaJS-based producer replacing node-rdkafka
const kafka = new Kafka({
clientId: 'node-kafka-producer',
brokers: ['localhost:9092'],
});

stream.on('error', (err) => {
console.error('Error in our kafka stream');
console.error(err);
});
const producer = kafka.producer();

function queueRandomMessage() {
async function sendRandomMessage() {
const category = getRandomAnimal();
const noise = getRandomNoise(category);
const event = { category, noise };
const success = stream.write(eventType.toBuffer(event));
if (success) {
console.log(`message queued (${JSON.stringify(event)})`);
} else {
console.log('Too many messages in the queue already..');

try {
await producer.send({
topic: 'test',
messages: [
{ value: eventType.toBuffer(event) }
],
});
console.log(`message sent (${JSON.stringify(event)})`);
} catch (error) {
console.error('Failed to send message', error);
}
}

Expand All @@ -41,6 +44,19 @@ function getRandomNoise(animal) {
}
}

setInterval(() => {
queueRandomMessage();
}, 3000);
async function start() {
await producer.connect();
console.log('producer ready..');
setInterval(sendRandomMessage, 3000);
}

start();

// Graceful shutdown
process.on('SIGINT', async () => {
try {
await producer.disconnect();
} finally {
process.exit(0);
}
});