From 1bf8159100be9bada7b7c1484f93904e70ac717b Mon Sep 17 00:00:00 2001 From: Robin Fehr Date: Mon, 11 Jul 2022 18:31:03 +0200 Subject: [PATCH] cooperative incremental rebalance --- docker-compose.yml | 72 +++++-- e2e/both.spec.js | 4 +- e2e/consumer.spec.js | 351 ++++++++++++++++++++++++++++++- e2e/producer-transaction.spec.js | 8 +- index.d.ts | 6 + lib/client.js | 4 +- lib/error.js | 6 +- lib/index.js | 2 +- lib/kafka-consumer.js | 134 +++++++++--- run_docker.sh | 4 +- src/kafka-consumer.cc | 202 +++++++++++++++++- src/kafka-consumer.h | 8 +- test/consumer.spec.js | 2 +- test/kafka-consumer.spec.js | 3 +- 14 files changed, 733 insertions(+), 73 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index abe29df2..8a12f135 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,23 +1,51 @@ --- -zookeeper: - image: confluentinc/cp-zookeeper - ports: - - "2181:2181" - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 -kafka: - image: confluentinc/cp-kafka - links: - - zookeeper - ports: - - "9092:9092" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' - KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092' - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_DEFAULT_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper + ports: + - "2181:2181" + networks: + - localnet + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + kafka: + image: confluentinc/cp-kafka + ports: + - 9092:9092 + - 9997:9997 + networks: + - localnet + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 + # KAFKA_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8080:8080 + networks: + - localnet + depends_on: + - zookeeper + - kafka + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181 +networks: + localnet: + attachable: true + diff --git a/e2e/both.spec.js b/e2e/both.spec.js index a8289ec3..47beb32d 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -228,7 +228,7 @@ describe('Consumer/Producer', function() { setTimeout(function() { producer.produce(topic, null, buffer, null); - }, 500) + }, 500); consumer.setDefaultConsumeTimeout(2000); consumer.consume(1000, function(err, messages) { t.ifError(err); @@ -261,7 +261,7 @@ describe('Consumer/Producer', function() { setTimeout(function() { producer.produce(topic, null, buffer, null); - }, 2000) + }, 2000); consumer.setDefaultConsumeTimeout(3000); consumer.consume(1000, function(err, messages) { t.ifError(err); diff --git a/e2e/consumer.spec.js b/e2e/consumer.spec.js index a167483f..38fcfd74 100644 --- a/e2e/consumer.spec.js +++ b/e2e/consumer.spec.js @@ -11,10 +11,12 @@ var crypto = require('crypto'); var eventListener = require('./listener'); +var cooperativeRebalanceCallback = require('../lib/kafka-consumer').cooperativeRebalanceCallback; var KafkaConsumer = require('../').KafkaConsumer; +var AdminClient = require('../').AdminClient; +var LibrdKafkaError = require('../lib/error'); var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092'; -var topic = 'test'; describe('Consumer', function() { var gcfg; @@ -31,6 +33,7 @@ describe('Consumer', function() { }); describe('commit', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -61,6 +64,7 @@ describe('Consumer', function() { }); describe('committed and position', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -95,6 +99,7 @@ describe('Consumer', function() { }); it('after assign, should get committed array without offsets ', function(done) { + var topic = 'test'; consumer.assign([{topic:topic, partition:0}]); // Defer this for a second setTimeout(function() { @@ -110,6 +115,7 @@ describe('Consumer', function() { }); it('after assign and commit, should get committed offsets', function(done) { + var topic = 'test'; consumer.assign([{topic:topic, partition:0}]); consumer.commitSync({topic:topic, partition:0, offset:1000}); consumer.committed(null, 1000, function(err, committed) { @@ -123,6 +129,7 @@ describe('Consumer', function() { }); it('after assign, before consume, position should return an array without offsets', function(done) { + var topic = 'test'; consumer.assign([{topic:topic, partition:0}]); var position = consumer.position(); t.equal(Array.isArray(position), true, 'Position should be an array'); @@ -147,6 +154,7 @@ describe('Consumer', function() { }); describe('seek and positioning', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -195,6 +203,7 @@ describe('Consumer', function() { describe('subscribe', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -232,6 +241,7 @@ describe('Consumer', function() { describe('assign', function() { + var topic = 'test'; var consumer; beforeEach(function(done) { consumer = new KafkaConsumer(gcfg, {}); @@ -266,7 +276,346 @@ describe('Consumer', function() { }); }); + describe('assignmentLost', function() { + function pollForTopic(client, topicName, maxTries, tryDelay, cb, customCondition) { + var tries = 0; + + function getTopicIfExists(innerCb) { + client.getMetadata({ + topic: topicName, + }, function(metadataErr, metadata) { + if (metadataErr) { + cb(metadataErr); + return; + } + + var topicFound = metadata.topics.filter(function(topicObj) { + var foundTopic = topicObj.name === topicName; + + // If we have a custom condition for "foundedness", do it here after + // we make sure we are operating on the correct topic + if (foundTopic && customCondition) { + return customCondition(topicObj); + } + return foundTopic; + }); + + if (topicFound.length >= 1) { + innerCb(null, topicFound[0]); + return; + } + + innerCb(new Error('Could not find topic ' + topicName)); + }); + } + + function maybeFinish(err, obj) { + if (err) { + queueNextTry(); + return; + } + + cb(null, obj); + } + + function queueNextTry() { + tries += 1; + if (tries < maxTries) { + setTimeout(function() { + getTopicIfExists(maybeFinish); + }, tryDelay); + } else { + cb(new Error('Exceeded max tries of ' + maxTries)); + } + } + + queueNextTry(); + } + + var client = AdminClient.create({ + 'client.id': 'kafka-test', + 'metadata.broker.list': kafkaBrokerList + }); + var consumer1; + var consumer2; + var assignmentLostCount = 0; + var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); + var assignment_lost_gcfg = { + 'bootstrap.servers': kafkaBrokerList, + 'group.id': grp, + 'debug': 'all', + 'enable.auto.commit': false, + 'session.timeout.ms': 10000, + 'heartbeat.interval.ms': 1000, + 'auto.offset.reset': 'earliest', + 'topic.metadata.refresh.interval.ms': 3000, + 'partition.assignment.strategy': 'cooperative-sticky', + 'rebalance_cb': function(err, assignment) { + if ( + err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS && + this.assignmentLost() + ) { + assignmentLostCount++; + } + cooperativeRebalanceCallback.call(this, err, assignment); + } + }; + + beforeEach(function(done) { + assignment_lost_gcfg['client.id'] = 1; + consumer1 = new KafkaConsumer(assignment_lost_gcfg, {}); + eventListener(consumer1); + consumer1.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + }); + assignment_lost_gcfg['client.id'] = 2; + consumer2 = new KafkaConsumer(assignment_lost_gcfg, {}); + eventListener(consumer2); + consumer2.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + done(); + }); + }); + + afterEach(function(done) { + consumer1.disconnect(function() { + consumer2.disconnect(function() { + done(); + }); + }); + }); + + it('should return false if not lost', function() { + t.equal(false, consumer1.assignmentLost()); + }); + + it('should be lost if topic gets deleted', function(cb) { + this.timeout(100000); + + var time = Date.now(); + var topicName = 'consumer-assignment-lost-test-topic-' + time; + var topicName2 = 'consumer-assignment-lost-test-topic2-' + time; + var deleting = false; + + client.createTopic({ + topic: topicName, + num_partitions: 2, + replication_factor: 1 + }, function(err) { + pollForTopic(consumer1, topicName, 10, 1000, function(err) { + t.ifError(err); + client.createTopic({ + topic: topicName2, + num_partitions: 2, + replication_factor: 1 + }, function(err) { + pollForTopic(consumer1, topicName2, 10, 1000, function(err) { + t.ifError(err); + consumer1.subscribe([topicName, topicName2]); + consumer2.subscribe([topicName, topicName2]); + consumer1.consume(); + consumer2.consume(); + var tryDelete = function() { + setTimeout(function() { + if(consumer1.assignments().length === 2 && + consumer2.assignments().length === 2 + ) { + client.deleteTopic(topicName, function(deleteErr) { + t.ifError(deleteErr); + }); + } else { + tryDelete(); + } + }, 2000); + }; + tryDelete(); + }); + }); + }); + }); + + var checking = false; + setInterval(function() { + if (assignmentLostCount >= 2 && !checking) { + checking = true; + t.equal(assignmentLostCount, 2); + client.deleteTopic(topicName2, function(deleteErr) { + // Cleanup topics + t.ifError(deleteErr); + cb(); + }); + } + }, 2000); + }); + + }); + + describe('incrementalAssign and incrementUnassign', function() { + + var topic = 'test7'; + var consumer; + beforeEach(function(done) { + consumer = new KafkaConsumer(gcfg, {}); + + consumer.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + done(); + }); + + eventListener(consumer); + }); + + afterEach(function(done) { + consumer.disconnect(function() { + done(); + }); + }); + + it('should be able to assign an assignment', function() { + t.equal(0, consumer.assignments().length); + var assignments = [{ topic:topic, partition:0 }]; + consumer.assign(assignments); + t.equal(1, consumer.assignments().length); + t.equal(0, consumer.assignments()[0].partition); + t.equal(0, consumer.subscription().length); + + var additionalAssignment = [{ topic:topic, partition:1 }]; + consumer.incrementalAssign(additionalAssignment); + t.equal(2, consumer.assignments().length); + t.equal(0, consumer.assignments()[0].partition); + t.equal(1, consumer.assignments()[1].partition); + t.equal(0, consumer.subscription().length); + }); + + it('should be able to revoke an assignment', function() { + t.equal(0, consumer.assignments().length); + var assignments = [{ topic:topic, partition:0 }, { topic:topic, partition:1 }, { topic:topic, partition:2 }]; + consumer.assign(assignments); + t.equal(3, consumer.assignments().length); + t.equal(0, consumer.assignments()[0].partition); + t.equal(1, consumer.assignments()[1].partition); + t.equal(2, consumer.assignments()[2].partition); + t.equal(0, consumer.subscription().length); + + var revokedAssignments = [{ topic:topic, partition:2 }]; + consumer.incrementalUnassign(revokedAssignments); + t.equal(2, consumer.assignments().length); + t.equal(0, consumer.assignments()[0].partition); + t.equal(1, consumer.assignments()[1].partition); + t.equal(0, consumer.subscription().length); + }); + + }); + + describe('rebalance', function() { + + var topic = 'test7'; + var grp = 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'); + var consumer1; + var consumer2; + var counter = 0; + var reblance_gcfg = { + 'bootstrap.servers': kafkaBrokerList, + 'group.id': grp, + 'debug': 'all', + 'enable.auto.commit': false, + 'heartbeat.interval.ms': 200, + 'rebalance_cb': true + }; + + it('should be able reblance using the eager strategy', function(done) { + this.timeout(20000); + + var isStarted = false; + reblance_gcfg['partition.assignment.strategy'] = 'range,roundrobin'; + + reblance_gcfg['client.id'] = '1'; + consumer1 = new KafkaConsumer(reblance_gcfg, {}); + reblance_gcfg['client.id'] = '2'; + consumer2 = new KafkaConsumer(reblance_gcfg, {}); + + eventListener(consumer1); + eventListener(consumer2); + + consumer1.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + consumer1.subscribe([topic]); + consumer1.on('rebalance', function(err, assignment) { + counter++; + if (!isStarted) { + isStarted = true; + consumer2.connect({ timeout: 2000 }, function(err, info) { + consumer2.subscribe([topic]); + consumer2.consume(); + consumer2.on('rebalance', function(err, assignment) { + counter++; + }); + }); + } + }); + consumer1.consume(); + }); + + setTimeout(function() { + t.deepStrictEqual(consumer1.assignments(), [ { topic: topic, partition: 0, offset: -1000 } ]); + t.deepStrictEqual(consumer2.assignments(), [ { topic: topic, partition: 1, offset: -1000 } ]); + t.equal(counter, 4); + consumer1.disconnect(function() { + consumer2.disconnect(function() { + done(); + }); + }); + }, 9000); + }); + + it('should be able reblance using the cooperative incremental strategy', function(cb) { + this.timeout(20000); + var isStarted = false; + reblance_gcfg['partition.assignment.strategy'] = 'cooperative-sticky'; + reblance_gcfg['client.id'] = '1'; + consumer1 = new KafkaConsumer(reblance_gcfg, {}); + reblance_gcfg['client.id'] = '2'; + consumer2 = new KafkaConsumer(reblance_gcfg, {}); + + eventListener(consumer1); + eventListener(consumer2); + + consumer1.connect({ timeout: 2000 }, function(err, info) { + t.ifError(err); + consumer1.subscribe([topic]); + consumer1.on('rebalance', function(err, assignment) { + if (!isStarted) { + isStarted = true; + consumer2.connect({ timeout: 2000 }, function(err, info) { + consumer2.subscribe([topic]); + consumer2.consume(); + consumer2.on('rebalance', function(err, assignment) { + counter++; + }); + }); + } + }); + consumer1.consume(); + }); + + setTimeout(function() { + t.equal(consumer1.assignments().length, 1); + t.equal(consumer2.assignments().length, 1); + t.equal(counter, 8); + + consumer1.disconnect(function() { + consumer2.disconnect(function() { + cb(); + }); + }); + }, 9000); + }); + + }); + describe('disconnect', function() { + + var topic = 'test'; var tcfg = { 'auto.offset.reset': 'earliest' }; it('should happen gracefully', function(cb) { diff --git a/e2e/producer-transaction.spec.js b/e2e/producer-transaction.spec.js index c633bc75..a8d50657 100644 --- a/e2e/producer-transaction.spec.js +++ b/e2e/producer-transaction.spec.js @@ -43,7 +43,7 @@ describe('Transactional Producer', function () { producerInput.disconnect(function (err) { consumerTrans.subscribe([topicIn]); done(err); - }) + }); } } producerInput = Kafka.Producer({ @@ -76,7 +76,7 @@ describe('Transactional Producer', function () { }); after(function (done) { - let connected = 2; + var connected = 2; function execDisconnect(client) { if (!client.isConnected) { connected--; @@ -225,7 +225,7 @@ describe('Transactional Producer', function () { return; } done(); - }) + }); } }); }); @@ -261,7 +261,7 @@ describe('Transactional Producer', function () { return; } done(); - }) + }); } else { done('Expected only B'); return; diff --git a/index.d.ts b/index.d.ts index d7ce7e61..2c7b9a3d 100644 --- a/index.d.ts +++ b/index.d.ts @@ -223,6 +223,12 @@ export class KafkaConsumer extends Client { consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void; consume(): void; + incrementalAssign(assigments: Assignment[]): this; + + incrementalUnassign(assignments: Assignment[]): this; + + assignmentLost(): boolean; + getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets; offsetsStore(topicPartitions: TopicPartitionOffset[]): any; diff --git a/lib/client.js b/lib/client.js index 9cbd3f9a..c4f1254c 100644 --- a/lib/client.js +++ b/lib/client.js @@ -62,12 +62,12 @@ function Client(globalConf, SubClientType, topicConf) { } } return obj2; - } + }; this._cb_configs = { global: extractFunctions(globalConf), topic: extractFunctions(topicConf), event: {}, - } + }; if (!no_event_cb) { this._cb_configs.event.event_cb = function(eventType, eventData) { diff --git a/lib/error.js b/lib/error.js index b63b8acf..11ded627 100644 --- a/lib/error.js +++ b/lib/error.js @@ -446,9 +446,9 @@ function LibrdKafkaError(e) { } - if (e.hasOwnProperty('isFatal')) this.isFatal = e.isFatal; - if (e.hasOwnProperty('isRetriable')) this.isRetriable = e.isRetriable; - if (e.hasOwnProperty('isTxnRequiresAbort')) this.isTxnRequiresAbort = e.isTxnRequiresAbort; + if (e.hasOwnProperty('isFatal')) { this.isFatal = e.isFatal; } + if (e.hasOwnProperty('isRetriable')) { this.isRetriable = e.isRetriable; } + if (e.hasOwnProperty('isTxnRequiresAbort')) { this.isTxnRequiresAbort = e.isTxnRequiresAbort; } } diff --git a/lib/index.js b/lib/index.js index e2e8a9c8..ba6d6782 100644 --- a/lib/index.js +++ b/lib/index.js @@ -7,7 +7,7 @@ * of the MIT license. See the LICENSE.txt file for details. */ -var KafkaConsumer = require('./kafka-consumer'); +var KafkaConsumer = require('./kafka-consumer').KafkaConsumer; var Producer = require('./producer'); var HighLevelProducer = require('./producer/high-level-producer'); var error = require('./error'); diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index be3b6390..7f0b5ee0 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -8,8 +8,6 @@ */ 'use strict'; -module.exports = KafkaConsumer; - var Client = require('./client'); var util = require('util'); var Kafka = require('../librdkafka'); @@ -21,6 +19,48 @@ var DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY = 500; var DEFAULT_CONSUME_TIME_OUT = 1000; util.inherits(KafkaConsumer, Client); +var eagerRebalanceCallback = function(err, assignment) { + // Create the librdkafka error + err = LibrdKafkaError.create(err); + // Emit the event + this.emit('rebalance', err, assignment); + + // That's it + try { + if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) { + this.assign(assignment); + } else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) { + this.unassign(); + } + } catch (e) { + // Ignore exceptions if we are not connected + if (this.isConnected()) { + this.emit('rebalance.error', e); + } + } +}; + +var cooperativeRebalanceCallback = function(err, assignment) { + // Create the librdkafka error + err = LibrdKafkaError.create(err); + // Emit the event + this.emit('rebalance', err, assignment); + + // That's it + try { + if (err.code === LibrdKafkaError.codes.ERR__ASSIGN_PARTITIONS) { + this.incrementalAssign(assignment); + } else if (err.code === LibrdKafkaError.codes.ERR__REVOKE_PARTITIONS) { + this.incrementalUnassign(assignment); + } + } catch (e) { + // Ignore exceptions if we are not connected + if (this.isConnected()) { + this.emit('rebalance.error', e); + } + } +}; + /** * KafkaConsumer class for reading messages from Kafka * @@ -51,41 +91,25 @@ function KafkaConsumer(conf, topicConf) { var self = this; // If rebalance is undefined we don't want any part of this - if (onRebalance && typeof onRebalance === 'boolean') { - conf.rebalance_cb = function(err, assignment) { - // Create the librdkafka error - err = LibrdKafkaError.create(err); - // Emit the event - self.emit('rebalance', err, assignment); - - // That's it - try { - if (err.code === -175 /*ERR__ASSIGN_PARTITIONS*/) { - self.assign(assignment); - } else if (err.code === -174 /*ERR__REVOKE_PARTITIONS*/) { - self.unassign(); - } - } catch (e) { - // Ignore exceptions if we are not connected - if (self.isConnected()) { - self.emit('rebalance.error', e); - } - } - }; - } else if (onRebalance && typeof onRebalance === 'function') { + if (onRebalance && typeof onRebalance === "boolean") { + conf.rebalance_cb = + conf["partition.assignment.strategy"] === "cooperative-sticky" ? + cooperativeRebalanceCallback.bind(this) : + eagerRebalanceCallback.bind(this); + } else if (onRebalance && typeof onRebalance === "function") { /* * Once this is opted in to, that's it. It's going to manually rebalance * forever. There is no way to unset config values in librdkafka, just * a way to override them. */ - conf.rebalance_cb = function(err, assignment) { - // Create the librdkafka error - err = err ? LibrdKafkaError.create(err) : undefined; + conf.rebalance_cb = function (err, assignment) { + // Create the librdkafka error + err = err ? LibrdKafkaError.create(err) : undefined; - self.emit('rebalance', err, assignment); - onRebalance.call(self, err, assignment); - }; + self.emit("rebalance", err, assignment); + onRebalance.call(self, err, assignment); + }; } // Same treatment for offset_commit_cb @@ -264,6 +288,19 @@ KafkaConsumer.prototype.assign = function(assignments) { return this; }; +/** + * Incremental assign the consumer specific partitions and topics + * + * @param {array} assignments - Assignments array. Should contain + * objects with topic and partition set. + * @return {Client} - Returns itself + */ + +KafkaConsumer.prototype.incrementalAssign = function(assignments) { + this._client.incrementalAssign(TopicPartition.map(assignments)); + return this; +}; + /** * Unassign the consumer from its assigned partitions and topics. * @@ -275,6 +312,34 @@ KafkaConsumer.prototype.unassign = function() { return this; }; +/** + * Incremental unassign the consumer from specific partitions and topics + * + * @param {array} assignments - Assignments array. Should contain + * objects with topic and partition set. + * @return {Client} - Returns itself + */ + +KafkaConsumer.prototype.incrementalUnassign = function(assignments) { + this._client.incrementalUnassign(TopicPartition.map(assignments)); + return this; +}; + +/** + * Get the assignment lost state. + * Examples for an assignment to be lost: + * - Unsuccessful heartbeats + * - Unknown member id during heartbeats + * - Illegal generation during heartbeats + * - Static consumer fenced by other consumer with same group.instance.id + * - Max. poll interval exceeded + * - Subscribed topic(s) no longer exist during meta data updates + * @return {boolean} - Returns true if the assignment is lost + */ + +KafkaConsumer.prototype.assignmentLost = function() { + return this._client.assignmentLost(); +}; /** * Get the assignments for the consumer @@ -477,7 +542,7 @@ KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) { function emitEofEventsFor(messageIndex) { while (currentEofEventsIndex < eofEvents.length && eofEvents[currentEofEventsIndex].messageIndex === messageIndex) { delete eofEvents[currentEofEventsIndex].messageIndex; - self.emit('partition.eof', eofEvents[currentEofEventsIndex]) + self.emit('partition.eof', eofEvents[currentEofEventsIndex]); ++currentEofEventsIndex; } } @@ -654,3 +719,10 @@ KafkaConsumer.prototype.pause = function(topicPartitions) { return this._errorWrap(this._client.pause(topicPartitions), true); }; + +module.exports = { + KafkaConsumer: KafkaConsumer, + eagerRebalanceCallback: eagerRebalanceCallback, + cooperativeRebalanceCallback: cooperativeRebalanceCallback +}; + diff --git a/run_docker.sh b/run_docker.sh index a6aadbd6..f817aa97 100755 --- a/run_docker.sh +++ b/run_docker.sh @@ -21,14 +21,16 @@ topics=( "test4" "test5" "test6" + "test7" ) # Run docker-compose exec to make them for topic in "${topics[@]}" do echo "Making topic $topic" + [[ "$topic" != "test7" ]] && partitions=1 || partitions=2 until docker-compose exec kafka \ - kafka-topics --create --topic $topic --partitions 1 --replication-factor 1 --if-not-exists --zookeeper zookeeper:2181 + kafka-topics --create --topic $topic --partitions $partitions --replication-factor 1 --if-not-exists --bootstrap-server localhost:9092 do topic_result="$?" if [ "$topic_result" == "1" ]; then diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 29911403..c1ef5bb9 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -177,6 +177,32 @@ Baton KafkaConsumer::Assign(std::vector partitions) { return Baton(errcode); } +Baton KafkaConsumer::IncrementalAssign( + std::vector partitions) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE, "KafkaConsumer is disconnected"); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + RdKafka::Error *e = consumer->incremental_assign(partitions); + + if (e) { + RdKafka::ErrorCode errcode = e->code(); + delete e; + return Baton(errcode); + } + + m_partition_cnt += partitions.size(); + for (auto i = partitions.begin(); i != partitions.end(); ++i) { + m_partitions.push_back(*i); + } + partitions.clear(); + + return Baton(RdKafka::ERR_NO_ERROR); +} + Baton KafkaConsumer::Unassign() { if (!IsClosing() && !IsConnected()) { return Baton(RdKafka::ERR__STATE); @@ -193,12 +219,46 @@ Baton KafkaConsumer::Unassign() { // Destroy the old list of partitions since we are no longer using it RdKafka::TopicPartition::destroy(m_partitions); + m_partitions.clear(); m_partition_cnt = 0; return Baton(RdKafka::ERR_NO_ERROR); } +Baton KafkaConsumer::IncrementalUnassign( + std::vector partitions) { + if (!IsClosing() && !IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + + RdKafka::Error *e = consumer->incremental_unassign(partitions); + if (e) { + RdKafka::ErrorCode errcode = e->code(); + delete e; + return Baton(errcode); + } + + // Destroy the old list of partitions since we are no longer using it + RdKafka::TopicPartition::destroy(partitions); + + m_partitions.erase( + std::remove_if( + m_partitions.begin(), + m_partitions.end(), + [&partitions](RdKafka::TopicPartition *x) -> bool { + return std::find( + partitions.begin(), + partitions.end(), x) != partitions.end(); + }), + m_partitions.end()); + m_partition_cnt -= partitions.size(); + return Baton(RdKafka::ERR_NO_ERROR); +} + Baton KafkaConsumer::Commit(std::vector toppars) { if (!IsConnected()) { return Baton(RdKafka::ERR__STATE); @@ -467,6 +527,12 @@ Baton KafkaConsumer::RefreshAssignments() { } } +bool KafkaConsumer::AssignmentLost() { + RdKafka::KafkaConsumer* consumer = + dynamic_cast(m_client); + return consumer->assignment_lost(); +} + std::string KafkaConsumer::Name() { if (!IsConnected()) { return std::string(""); @@ -525,8 +591,11 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "committed", NodeCommitted); Nan::SetPrototypeMethod(tpl, "position", NodePosition); Nan::SetPrototypeMethod(tpl, "assign", NodeAssign); + Nan::SetPrototypeMethod(tpl, "incrementalAssign", NodeIncrementalAssign); Nan::SetPrototypeMethod(tpl, "unassign", NodeUnassign); - Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignments); + Nan::SetPrototypeMethod(tpl, "incrementalUnassign", NodeIncrementalUnassign); + Nan::SetPrototypeMethod(tpl, "assignments", NodeAssignment); + Nan::SetPrototypeMethod(tpl, "assignmentLost", NodeAssignmentLost); Nan::SetPrototypeMethod(tpl, "commit", NodeCommit); Nan::SetPrototypeMethod(tpl, "commitSync", NodeCommitSync); @@ -682,7 +751,7 @@ NAN_METHOD(KafkaConsumer::NodePosition) { RdKafka::TopicPartition::destroy(toppars); } -NAN_METHOD(KafkaConsumer::NodeAssignments) { +NAN_METHOD(KafkaConsumer::NodeAssignment) { Nan::HandleScope scope; KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); @@ -757,6 +826,64 @@ NAN_METHOD(KafkaConsumer::NodeAssign) { info.GetReturnValue().Set(Nan::True()); } +NAN_METHOD(KafkaConsumer::NodeIncrementalAssign) { + Nan::HandleScope scope; + + if (info.Length() < 1 || !info[0]->IsArray()) { + // Just throw an exception + return Nan::ThrowError("Need to specify an array of partitions"); + } + + v8::Local partitions = info[0].As(); + std::vector topic_partitions; + + for (unsigned int i = 0; i < partitions->Length(); ++i) { + v8::Local partition_obj_value; + if (!( + Nan::Get(partitions, i).ToLocal(&partition_obj_value) && + partition_obj_value->IsObject())) { + Nan::ThrowError("Must pass topic-partition objects"); + } + + v8::Local partition_obj = partition_obj_value.As(); + + // Got the object + int64_t partition = GetParameter(partition_obj, "partition", -1); + std::string topic = GetParameter(partition_obj, "topic", ""); + + if (!topic.empty()) { + RdKafka::TopicPartition* part; + + if (partition < 0) { + part = Connection::GetPartition(topic); + } else { + part = Connection::GetPartition(topic, partition); + } + + // Set the default value to offset invalid. If provided, we will not set + // the offset. + int64_t offset = GetParameter( + partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID); + if (offset != RdKafka::Topic::OFFSET_INVALID) { + part->set_offset(offset); + } + + topic_partitions.push_back(part); + } + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + + // Hand over the partitions to the consumer. + Baton b = consumer->IncrementalAssign(topic_partitions); + + if (b.err() != RdKafka::ERR_NO_ERROR) { + Nan::ThrowError(RdKafka::err2str(b.err()).c_str()); + } + + info.GetReturnValue().Set(Nan::True()); +} + NAN_METHOD(KafkaConsumer::NodeUnassign) { Nan::HandleScope scope; @@ -777,6 +904,71 @@ NAN_METHOD(KafkaConsumer::NodeUnassign) { info.GetReturnValue().Set(Nan::True()); } +NAN_METHOD(KafkaConsumer::NodeIncrementalUnassign) { + Nan::HandleScope scope; + + if (info.Length() < 1 || !info[0]->IsArray()) { + // Just throw an exception + return Nan::ThrowError("Need to specify an array of partitions"); + } + + v8::Local partitions = info[0].As(); + std::vector topic_partitions; + + for (unsigned int i = 0; i < partitions->Length(); ++i) { + v8::Local partition_obj_value; + if (!( + Nan::Get(partitions, i).ToLocal(&partition_obj_value) && + partition_obj_value->IsObject())) { + Nan::ThrowError("Must pass topic-partition objects"); + } + + v8::Local partition_obj = partition_obj_value.As(); + + // Got the object + int64_t partition = GetParameter(partition_obj, "partition", -1); + std::string topic = GetParameter(partition_obj, "topic", ""); + + if (!topic.empty()) { + RdKafka::TopicPartition* part; + + if (partition < 0) { + part = Connection::GetPartition(topic); + } else { + part = Connection::GetPartition(topic, partition); + } + + // Set the default value to offset invalid. If provided, we will not set + // the offset. + int64_t offset = GetParameter( + partition_obj, "offset", RdKafka::Topic::OFFSET_INVALID); + if (offset != RdKafka::Topic::OFFSET_INVALID) { + part->set_offset(offset); + } + + topic_partitions.push_back(part); + } + } + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + // Hand over the partitions to the consumer. + Baton b = consumer->IncrementalUnassign(topic_partitions); + + if (b.err() != RdKafka::ERR_NO_ERROR) { + Nan::ThrowError(RdKafka::err2str(b.err()).c_str()); + } + + info.GetReturnValue().Set(Nan::True()); +} + +NAN_METHOD(KafkaConsumer::NodeAssignmentLost) { + Nan::HandleScope scope; + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + bool b = consumer->AssignmentLost(); + info.GetReturnValue().Set(Nan::New(b)); +} + NAN_METHOD(KafkaConsumer::NodeUnsubscribe) { Nan::HandleScope scope; @@ -1087,7 +1279,11 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) { Nan::Callback *callback = new Nan::Callback(cb); Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeLoop(callback, consumer, timeout_ms, retry_read_ms)); + new Workers::KafkaConsumerConsumeLoop( + callback, + consumer, + timeout_ms, + retry_read_ms)); info.GetReturnValue().Set(Nan::Null()); } diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index 57b92904..4a9fba17 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -72,7 +72,10 @@ class KafkaConsumer : public Connection { int AssignedPartitionCount(); Baton Assign(std::vector); + Baton IncrementalAssign(std::vector); Baton Unassign(); + Baton IncrementalUnassign(std::vector); + bool AssignmentLost(); Baton Seek(const RdKafka::TopicPartition &partition, int timeout_ms); @@ -103,8 +106,11 @@ class KafkaConsumer : public Connection { static NAN_METHOD(NodeSubscribe); static NAN_METHOD(NodeDisconnect); static NAN_METHOD(NodeAssign); + static NAN_METHOD(NodeIncrementalAssign); static NAN_METHOD(NodeUnassign); - static NAN_METHOD(NodeAssignments); + static NAN_METHOD(NodeIncrementalUnassign); + static NAN_METHOD(NodeAssignmentLost); + static NAN_METHOD(NodeAssignment); static NAN_METHOD(NodeUnsubscribe); static NAN_METHOD(NodeCommit); static NAN_METHOD(NodeCommitSync); diff --git a/test/consumer.spec.js b/test/consumer.spec.js index 40b52ee4..5e1a5655 100644 --- a/test/consumer.spec.js +++ b/test/consumer.spec.js @@ -77,7 +77,7 @@ module.exports = { }); }, 'has necessary bindings for librdkafka 1:1 binding': function() { - var methods = ['assign', 'unassign', 'subscribe']; + var methods = ['assign', 'unassign', 'subscribe', 'incrementalAssign', 'incrementalUnassign', 'assignmentLost']; methods.forEach(function(m) { t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method'); }); diff --git a/test/kafka-consumer.spec.js b/test/kafka-consumer.spec.js index 0f4de520..ada72a7e 100644 --- a/test/kafka-consumer.spec.js +++ b/test/kafka-consumer.spec.js @@ -7,7 +7,8 @@ * of the MIT license. See the LICENSE.txt file for details. */ -var KafkaConsumer = require('../lib/kafka-consumer'); +var KafkaConsumer = require('../lib/kafka-consumer').KafkaConsumer; + var t = require('assert'); var client;