From fb91c206ab201592e3561e5b1c65ea119de3db9a Mon Sep 17 00:00:00 2001 From: Vesa Kauppinen Date: Tue, 4 Nov 2025 20:04:03 +0200 Subject: [PATCH] Updated project dependencies to newer versions and made required code changes to maintain compatibility. --- consumer/index.js | 50 +++++++++++++++++++++++++++++++++----------- docker-compose.yml | 32 +++++++++++++++------------- package.json | 4 ++-- producer/index.js | 52 ++++++++++++++++++++++++++++++---------------- 4 files changed, 92 insertions(+), 46 deletions(-) diff --git a/consumer/index.js b/consumer/index.js index 0409d4f..2912cf6 100644 --- a/consumer/index.js +++ b/consumer/index.js @@ -1,17 +1,43 @@ -import Kafka from 'node-rdkafka'; +import { Kafka } from 'kafkajs'; import eventType from '../eventType.js'; -var consumer = new Kafka.KafkaConsumer({ - 'group.id': 'kafka', - 'metadata.broker.list': 'localhost:9092', -}, {}); +// KafkaJS-based consumer replacing node-rdkafka +const kafka = new Kafka({ + clientId: 'node-kafka-consumer', + brokers: ['localhost:9092'], +}); + +const consumer = kafka.consumer({ groupId: 'kafka' }); + +async function start() { + try { + await consumer.connect(); + await consumer.subscribe({ topic: 'test', fromBeginning: true }); + console.log('consumer ready..'); + + await consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + try { + const decoded = eventType.fromBuffer(message.value); + console.log(`received message: ${decoded}`); + } catch (err) { + console.error('Failed to decode message', err); + } + }, + }); + } catch (err) { + console.error('Consumer startup failed', err); + process.exit(1); + } +} -consumer.connect(); +start(); -consumer.on('ready', () => { - console.log('consumer ready..') - consumer.subscribe(['test']); - consumer.consume(); -}).on('data', function(data) { - console.log(`received message: ${eventType.fromBuffer(data.value)}`); +// Graceful shutdown (optional but helpful during dev) +process.on('SIGINT', async () => { + try { + await consumer.disconnect(); + } finally { + process.exit(0); + } }); diff --git a/docker-compose.yml b/docker-compose.yml index a91803d..ab6c346 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,21 +1,25 @@ -version: "3" +version: "3.8" services: zookeeper: - image: 'bitnami/zookeeper:latest' - ports: - - '2181:2181' + image: confluentinc/cp-zookeeper:7.6.0 environment: - - ALLOW_ANONYMOUS_LOGIN=yes + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - "2181:2181" + kafka: - image: 'bitnami/kafka:latest' - container_name: 'kafka' + image: confluentinc/cp-kafka:7.6.0 + container_name: kafka ports: - - '9092:9092' - environment: - - KAFKA_BROKER_ID=1 - - KAFKA_LISTENERS=PLAINTEXT://:9092 - - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - - ALLOW_PLAINTEXT_LISTENER=yes + - "9092:9092" depends_on: - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 \ No newline at end of file diff --git a/package.json b/package.json index 4e7c712..c9ff8da 100644 --- a/package.json +++ b/package.json @@ -11,7 +11,7 @@ "author": "", "license": "ISC", "dependencies": { - "avsc": "^5.6.2", - "node-rdkafka": "^2.10.1" + "kafkajs": "^2.2.4", + "avsc": "^5.7.9" } } diff --git a/producer/index.js b/producer/index.js index 299c1d1..a8fdd64 100644 --- a/producer/index.js +++ b/producer/index.js @@ -1,26 +1,29 @@ -import Kafka from 'node-rdkafka'; +import { Kafka } from 'kafkajs'; import eventType from '../eventType.js'; -const stream = Kafka.Producer.createWriteStream({ - 'metadata.broker.list': 'localhost:9092' -}, {}, { - topic: 'test' +// KafkaJS-based producer replacing node-rdkafka +const kafka = new Kafka({ + clientId: 'node-kafka-producer', + brokers: ['localhost:9092'], }); -stream.on('error', (err) => { - console.error('Error in our kafka stream'); - console.error(err); -}); +const producer = kafka.producer(); -function queueRandomMessage() { +async function sendRandomMessage() { const category = getRandomAnimal(); const noise = getRandomNoise(category); const event = { category, noise }; - const success = stream.write(eventType.toBuffer(event)); - if (success) { - console.log(`message queued (${JSON.stringify(event)})`); - } else { - console.log('Too many messages in the queue already..'); + + try { + await producer.send({ + topic: 'test', + messages: [ + { value: eventType.toBuffer(event) } + ], + }); + console.log(`message sent (${JSON.stringify(event)})`); + } catch (error) { + console.error('Failed to send message', error); } } @@ -41,6 +44,19 @@ function getRandomNoise(animal) { } } -setInterval(() => { - queueRandomMessage(); -}, 3000); +async function start() { + await producer.connect(); + console.log('producer ready..'); + setInterval(sendRandomMessage, 3000); +} + +start(); + +// Graceful shutdown +process.on('SIGINT', async () => { + try { + await producer.disconnect(); + } finally { + process.exit(0); + } +});