diff --git a/lib/BackbeatConsumer.js b/lib/BackbeatConsumer.js index 63bf426fe..160387399 100644 --- a/lib/BackbeatConsumer.js +++ b/lib/BackbeatConsumer.js @@ -823,10 +823,22 @@ class BackbeatConsumer extends EventEmitter { }); // ensure consumer is active when calling offsetsStore() on // it, to avoid raising an exception (invalid state) - // TODO : potential issue here, see BB-758 - if (committableOffset !== null && !this.isPaused()) { - this._consumer.offsetsStore([{ topic, partition, - offset: committableOffset }]); + if (committableOffset !== null && this._consumer.isConnected() && !this.isPaused()) { + try { + this._consumer.offsetsStore([{ topic, partition, offset: committableOffset }]); + } catch (e) { + // offsetsStore() should not throw given the guards above; + // if it does (e.g. ERR__STATE race during a rebalance or + // shutdown), log as error but do not crash — the offset will + // be re-committed after the partition is re-assigned. + this._log.error('offsetsStore failed', { + error: e.toString(), + topic, + partition, + offset: committableOffset, + groupId: this._groupId, + }); + } } } diff --git a/tests/functional/lib/BackbeatConsumer.js b/tests/functional/lib/BackbeatConsumer.js index cf4b1ba59..8d481a65f 100644 --- a/tests/functional/lib/BackbeatConsumer.js +++ b/tests/functional/lib/BackbeatConsumer.js @@ -378,6 +378,104 @@ describe('BackbeatConsumer rebalance tests', () => { } }, 1000); }).timeout(60000); + +}); + +describe('BackbeatConsumer deferred commit after rebalance', () => { + const topic = 'backbeat-consumer-spec-ERR-STATE'; + const groupId = `replication-group-${Math.random()}`; + let producer; + let consumer1; + let consumer2; + + before(function before(done) { + this.timeout(60000); + + producer = new BackbeatProducer({ + kafka: producerKafkaConf, topic, + pollIntervalMs: 100, + compressionType: 'none', + }); + consumer1 = new BackbeatConsumer({ + clientId: 'BackbeatConsumer-ERR-STATE-1', + zookeeper: zookeeperConf, + kafka: { ...consumerKafkaConf, compressionType: 'none' }, + groupId, topic, + queueProcessor: (_msg, cb) => cb(), + bootstrap: true, + }); + async.parallel([ + innerDone => producer.on('ready', innerDone), + innerDone => consumer1.on('ready', innerDone), + ], err => { + if (err) return done(err); + consumer2 = new BackbeatConsumer({ + clientId: 'BackbeatConsumer-ERR-STATE-2', + zookeeper: zookeeperConf, + kafka: { ...consumerKafkaConf, compressionType: 'none' }, + groupId, topic, + queueProcessor: (_msg, cb) => cb(), + }); + consumer2.on('ready', done); + }); + }); + + after(function after(done) { + this.timeout(10000); + async.parallel([ + innerDone => producer.close(innerDone), + innerDone => consumer1.close(innerDone), + innerDone => (consumer2 ? consumer2.close(innerDone) : innerDone()), + ], done); + }); + + it('should not crash when onEntryCommittable is called after partition revoke', done => { + let deferredEntry = null; + + // Setup: when consumer1 receives a message, complete with + // { committable: false }. This frees the processing queue + // slot but does NOT commit the offset. + consumer1._queueProcessor = (message, cb) => { + deferredEntry = message; + process.nextTick(() => cb(null, { committable: false })); + }; + + consumer2._queueProcessor = (_message, cb) => { + process.nextTick(cb); + }; + + // 1 : consumer1 subscribes and consumes the message. + consumer1.subscribe(); + producer.send([{ key: 'foo', message: '{"hello":"foo"}' }], err => { + assert.ifError(err); + }); + + // 2 : wait until consumer1 has processed the message. + // The processing queue is now idle but the + // deferred commit is still pending. + const waitForDeferred = setInterval(() => { + if (!deferredEntry) { + return; + } + clearInterval(waitForDeferred); + + // 3 : consumer2 joins the same group, triggering a + // rebalance. consumer1's revoke handler sees an idle + // queue and immediately unassigns the partition. + consumer1.once('unassign', () => { + // 4 : the external caller finishes its work and calls + // onEntryCommittable() for the now-revoked partition. + // It would crash without the try catch in the method, as + // an error ERR__STATE is returned by librdkafka when trying to commit + assert.doesNotThrow(() => { + consumer1.onEntryCommittable(deferredEntry); + }); + done(); + }); + + consumer2.subscribe(); + }, 100); + }).timeout(40000); }); describe('BackbeatConsumer concurrency tests', () => { diff --git a/tests/unit/backbeatConsumer.js b/tests/unit/backbeatConsumer.js index 5f9ed5a5d..951fad035 100644 --- a/tests/unit/backbeatConsumer.js +++ b/tests/unit/backbeatConsumer.js @@ -2,6 +2,7 @@ const assert = require('assert'); const sinon = require('sinon'); const BackbeatConsumer = require('../../lib/BackbeatConsumer'); +const { CODES } = require('node-rdkafka'); const { kafka } = require('../config.json'); const { BreakerState } = require('breakbeat').CircuitBreaker; @@ -238,6 +239,80 @@ describe('backbeatConsumer', () => { }); }); + describe('onEntryCommittable', () => { + let consumer; + let mockConsumer; + + const entry = { + topic: 'my-test-topic', + partition: 2, + offset: 280, + key: null, + timestamp: Date.now(), + }; + + beforeEach(() => { + consumer = new BackbeatConsumerMock({ + kafka, + groupId: 'unittest-group', + topic: 'my-test-topic', + }); + + mockConsumer = { + offsetsStore: sinon.stub(), + subscription: sinon.stub().returns(['my-test-topic']), + isConnected: sinon.stub().returns(true), + }; + consumer._consumer = mockConsumer; + + // pre-register the offset as consumed so onOffsetProcessed returns a value + consumer._offsetLedger.onOffsetConsumed(entry.topic, entry.partition, entry.offset); + }); + + afterEach(() => { + sinon.restore(); + }); + + it('should call offsetsStore when consumer is active and connected', () => { + consumer.onEntryCommittable(entry); + assert(mockConsumer.offsetsStore.calledOnce); + }); + + it('should not call offsetsStore when consumer is paused (unsubscribed)', () => { + mockConsumer.subscription.returns([]); + consumer.onEntryCommittable(entry); + assert(mockConsumer.offsetsStore.notCalled); + }); + + it('should not call offsetsStore when consumer is not connected', () => { + mockConsumer.isConnected.returns(false); + consumer.onEntryCommittable(entry); + assert(mockConsumer.offsetsStore.notCalled); + }); + + it('should not throw and always log at error level when offsetsStore throws', () => { + const errState = new Error('Local: Erroneous state'); + errState.code = CODES.ERRORS.ERR__STATE; + mockConsumer.offsetsStore.throws(errState); + + const errorSpy = sinon.spy(consumer._log, 'error'); + + assert.doesNotThrow(() => consumer.onEntryCommittable(entry)); + assert(errorSpy.calledOnce); + }); + + it('should not throw and log at error level when offsetsStore throws an unexpected error', () => { + const unexpectedErr = new Error('unexpected kafka error'); + unexpectedErr.code = -1; + mockConsumer.offsetsStore.throws(unexpectedErr); + + const errorSpy = sinon.spy(consumer._log, 'error'); + + assert.doesNotThrow(() => consumer.onEntryCommittable(entry)); + assert(errorSpy.calledOnce); + }); + }); + describe('_getAvailableSlotsInPipeline', () => { let consumer;