diff --git a/e2e/both.spec.js b/e2e/both.spec.js index a8289ec3..68f13f8b 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -163,7 +163,7 @@ describe('Consumer/Producer', function() { }); }); - + it('should return ready messages on partition EOF', function(done) { crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); @@ -221,7 +221,7 @@ describe('Consumer/Producer', function() { consumer.once('data', function(msg) { events.push("data"); }); - + consumer.once('partition.eof', function(eof) { events.push("partition.eof"); }); @@ -272,6 +272,100 @@ describe('Consumer/Producer', function() { }); }); + it('should fill a batch when there is no total consume timeout set', function(done) { + crypto.randomBytes(4096, function(ex, buffer) { + producer.setPollInterval(10); + + producer.once('delivery-report', function(err, report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + var events = []; + + consumer.once('data', function(msg) { + events.push("data"); + }); + + consumer.on('partition.eof', function(eof) { + events.push("partition.eof"); + }); + + // Produce 10 messages, 500ms apart — the whole batch should fill, but the + // time taken for the consume call should be >=5000ms. + + let timeoutId; + let toProduce = 10; + produceLoop = () => { + producer.produce(topic, null, buffer, null); + if (--toProduce > 0) { + timeoutId = setTimeout(produceLoop, 500); + } + }; + produceLoop(); + + consumer.setDefaultConsumeTimeout(1000); + const started = Date.now(); + consumer.consume(10, function(err, messages) { + t.ifError(err); + t(messages.length >= 10, 'Too few messages consumed within batch'); + t(Date.now() - started < 5000, 'Consume finished too fast, should have taken at least 5 seconds') + clearTimeout(timeoutId); + done(); + }); + }); + }); + + it('should not fill a batch when there is a total consume timeout set', function(done) { + crypto.randomBytes(4096, function(ex, buffer) { + producer.setPollInterval(10); + + producer.once('delivery-report', function(err, report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + var events = []; + + consumer.once('data', function(msg) { + events.push("data"); + }); + + consumer.on('partition.eof', function(eof) { + events.push("partition.eof"); + }); + + // Produce 20 messages, 900ms apart — the whole batch should *not* fill, + // we should only get 11 messages (11*900 = 9900, 9900 < 10000). + + let timeoutId; + let toProduce = 20; + produceLoop = () => { + producer.produce(topic, null, buffer, null); + if (--toProduce > 0) { + timeoutId = setTimeout(produceLoop, 900); + } + }; + produceLoop(); + + consumer.setDefaultConsumeTimeout(1000); + consumer.setDefaultTotalConsumeTimeout(10000); + const started = Date.now(); + consumer.consume(100, function(err, messages) { + t.ifError(err); + // Why 13? First message is produced immediately, then 11 more are + // produced over 990 seconds, and then a 13th is produced and consumed + // in the "final" loop where elapsed time > 10,000. + t(messages.length == 13, 'Batch should have consumed 13 messages, instead consumed ' + messages.length + ' messages'); + t((Date.now() - started) < (10000 + 1000), 'Consume took ' + (Date.now() - started) + ', longer than global timeout + single message timeout of ' + (10000 + 1000)); + clearTimeout(timeoutId); + done(); + }); + }); + }); + it('should be able to produce and consume messages: consumeLoop', function(done) { var key = 'key'; @@ -409,7 +503,7 @@ describe('Consumer/Producer', function() { ]; run_headers_test(done, headers); }); - + it('should be able to produce and consume messages with multiple headers value as string: consumeLoop', function(done) { var headers = [ { key1: 'value1' }, diff --git a/index.d.ts b/index.d.ts index d7ce7e61..a13225ad 100644 --- a/index.d.ts +++ b/index.d.ts @@ -220,7 +220,7 @@ export class KafkaConsumer extends Client { committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this; consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void; - consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void; + consume(cb: (err: LibrdKafkaError, messages: Message) => void): void; consume(): void; getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets; @@ -237,6 +237,8 @@ export class KafkaConsumer extends Client { setDefaultConsumeTimeout(timeoutMs: number): void; + setDefaultTotalConsumeTimeout(totalTimeoutMs: number): void + setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void; subscribe(topics: SubscribeTopicList): this; diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index c479240f..4faad5c7 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -132,6 +132,7 @@ function KafkaConsumer(conf, topicConf) { this.topicConfig = topicConf; this._consumeTimeout = DEFAULT_CONSUME_TIME_OUT; + this._consumeTotalTimeout = undefined; this._consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY; } @@ -143,6 +144,15 @@ KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) { this._consumeTimeout = timeoutMs; }; +/** + * Set the default total consume operation timeout provided to c++land + * @param {number} totalTimeoutMs - maximum number of milliseconds to allow a + * batch consume operation to run + */ +KafkaConsumer.prototype.setDefaultTotalConsumeTimeout = function(totalTimeoutMs) { + this._consumeTotalTimeout = totalTimeoutMs; +}; + /** * Set the default sleep delay for the next consume loop after the previous one has timed out. * @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out @@ -386,19 +396,20 @@ KafkaConsumer.prototype.unsubscribe = function() { */ KafkaConsumer.prototype.consume = function(number, cb) { var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT; - var self = this; - if ((number && typeof number === 'number') || (number && cb)) { + // Undefined is OK here — the default is to not upper bound the consume call's + // total time spent, to avoid breaking changes. + var totalTimeoutMs = this._consumeTotalTimeout + if ((number && typeof number === 'number') || (number && cb)) { if (cb === undefined) { cb = function() {}; } else if (typeof cb !== 'function') { throw new TypeError('Callback must be a function'); } - this._consumeNum(timeoutMs, number, cb); + this._consumeNum(timeoutMs, totalTimeoutMs, number, cb); } else { - // See https://github.com/Blizzard/node-rdkafka/issues/220 // Docs specify just a callback can be provided but really we needed // a fallback to the number argument @@ -426,6 +437,7 @@ KafkaConsumer.prototype.consume = function(number, cb) { KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) { var self = this; var retryReadInterval = this._consumeLoopTimeoutDelay; + self._client.consumeLoop(timeoutMs, retryReadInterval, function readCallback(err, message, eofEvent, warning) { if (err) { @@ -460,10 +472,10 @@ KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) { * @private * @see consume */ -KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) { +KafkaConsumer.prototype._consumeNum = function(timeoutMs, totalTimeoutMs, numMessages, cb) { var self = this; - this._client.consume(timeoutMs, numMessages, function(err, messages, eofEvents) { + this._client.consume(timeoutMs, totalTimeoutMs, numMessages, function(err, messages, eofEvents) { if (err) { err = LibrdKafkaError.create(err); if (cb) { diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 019b0cb6..4f971c1f 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -1049,7 +1049,7 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) { if (info.Length() < 3) { // Just throw an exception - return Nan::ThrowError("Invalid number of parameters"); + return Nan::ThrowError("Invalid number of parameters, expected 3"); } if (!info[0]->IsNumber()) { @@ -1106,11 +1106,12 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) { NAN_METHOD(KafkaConsumer::NodeConsume) { Nan::HandleScope scope; - if (info.Length() < 2) { - // Just throw an exception - return Nan::ThrowError("Invalid number of parameters"); + if (info.Length() != 4) { + // Just throw an exception — we didn't get enough arguments + return Nan::ThrowError("Invalid number of arguments, expected 4"); } + // The first argument should always be the timeout. int timeout_ms; Nan::Maybe maybeTimeout = Nan::To(info[0].As()); @@ -1121,40 +1122,36 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { timeout_ms = static_cast(maybeTimeout.FromJust()); } - if (info[1]->IsNumber()) { - if (!info[2]->IsFunction()) { - return Nan::ThrowError("Need to specify a callback"); - } - - v8::Local numMessagesNumber = info[1].As(); - Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT - - uint32_t numMessages; - if (numMessagesMaybe.IsNothing()) { - return Nan::ThrowError("Parameter must be a number over 0"); - } else { - numMessages = numMessagesMaybe.FromJust(); - } - - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); - - v8::Local cb = info[2].As(); - Nan::Callback *callback = new Nan::Callback(cb); - Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT - + // Parse the total timeout — how long to wait + uint total_timeout_ms; + Nan::Maybe maybeTotalTimeout = + Nan::To(info[1].As()); + if (maybeTotalTimeout.IsNothing()) { + total_timeout_ms = -1; } else { - if (!info[1]->IsFunction()) { - return Nan::ThrowError("Need to specify a callback"); - } + total_timeout_ms = static_cast(maybeTotalTimeout.FromJust()); + } - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + // Parse the # of messages to read + v8::Local numMessagesNumber = info[2].As(); + Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT + uint32_t numMessages; + if (numMessagesMaybe.IsNothing()) { + return Nan::ThrowError("Number of messages to consume must be a number over 0"); + } else { + numMessages = numMessagesMaybe.FromJust(); + } - v8::Local cb = info[1].As(); - Nan::Callback *callback = new Nan::Callback(cb); - Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsume(callback, consumer, timeout_ms)); + // Check that the callback is configured properly + if (!info[3]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); } + v8::Local cb = info[3].As(); + Nan::Callback *callback = new Nan::Callback(cb); + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, total_timeout_ms)); // NOLINT info.GetReturnValue().Set(Nan::Null()); } @@ -1195,7 +1192,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) { // cleanup the async worker consumeLoop->WorkComplete(); consumeLoop->Destroy(); - + consumer->m_consume_loop = nullptr; } diff --git a/src/workers.cc b/src/workers.cc index 55d3dd50..1ddfe441 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -17,11 +17,30 @@ #else // Windows specific #include +#include #endif using NodeKafka::Producer; using NodeKafka::Connection; +#ifndef _WIN32 +time_t get_millisecond_timestamp() { + // Get a precise timer from the monotonic clock. + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + + // Convert it to milliseconds by multiplying the seconds component by 10e3 and + // dividing the nanoseconds component by 10e6 + return (ts.tv_sec * 1000 + ts.tv_nsec / 1000000); +} +#else +time_t get_millisecond_timestamp() { + // Just call GetTickCount64. We could use QueryHighPerformanceCounter, but we + // don't need that level of precision. + return GetTickCount64() +} +#endif + namespace NodeKafka { namespace Workers { @@ -797,11 +816,13 @@ void KafkaConsumerConsumeLoop::HandleErrorCallback() { KafkaConsumerConsumeNum::KafkaConsumerConsumeNum(Nan::Callback *callback, KafkaConsumer* consumer, const uint32_t & num_messages, - const int & timeout_ms) : + const int & timeout_ms, + const uint & total_timeout_ms) : ErrorAwareWorker(callback), m_consumer(consumer), m_num_messages(num_messages), - m_timeout_ms(timeout_ms) {} + m_timeout_ms(timeout_ms), + m_total_timeout_ms(total_timeout_ms) {} KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {} @@ -809,14 +830,29 @@ void KafkaConsumerConsumeNum::Execute() { std::size_t max = static_cast(m_num_messages); bool looping = true; int timeout_ms = m_timeout_ms; + uint total_timeout_ms = m_total_timeout_ms; + uint64_t start = get_millisecond_timestamp(); std::size_t eof_event_count = 0; while (m_messages.size() - eof_event_count < max && looping) { // Get a message + Baton b = m_consumer->Consume(timeout_ms); if (b.err() == RdKafka::ERR_NO_ERROR) { RdKafka::Message *message = b.data(); RdKafka::ErrorCode errorCode = message->err(); + + // Check how much time has passed since we started this consume loop — if + // it's more than the total timeout in milliseconds (if one is configured + // and is greater than 0ms — the default is -1, to avoid breaking + // behavioral changes), stop the loop regardless, after processing the + // error code. + uint64_t current = get_millisecond_timestamp(); + uint64_t elapsed = current - start; + if (elapsed > total_timeout_ms && total_timeout_ms > 0) { + looping = false; + } + switch (errorCode) { case RdKafka::ERR__PARTITION_EOF: // If partition EOF and have consumed messages, retry with timeout 1 @@ -824,7 +860,7 @@ void KafkaConsumerConsumeNum::Execute() { if (m_messages.size() > eof_event_count) { timeout_ms = 1; } - + // We will only go into this code path when `enable.partition.eof` is set to true // In this case, consumer is also interested in EOF messages, so we return an EOF message m_messages.push_back(message); @@ -872,7 +908,7 @@ void KafkaConsumerConsumeNum::HandleOKCallback() { for (std::vector::iterator it = m_messages.begin(); it != m_messages.end(); ++it) { RdKafka::Message* message = *it; - + switch (message->err()) { case RdKafka::ERR_NO_ERROR: ++returnArrayIndex; @@ -890,7 +926,7 @@ void KafkaConsumerConsumeNum::HandleOKCallback() { Nan::New(message->offset())); Nan::Set(eofEvent, Nan::New("partition").ToLocalChecked(), Nan::New(message->partition())); - + // also store index at which position in the message array this event was emitted // this way, we can later emit it at the right point in time Nan::Set(eofEvent, Nan::New("messageIndex").ToLocalChecked(), @@ -898,7 +934,7 @@ void KafkaConsumerConsumeNum::HandleOKCallback() { Nan::Set(eofEventsArray, eofEventsArrayIndex, eofEvent); } - + delete message; } } diff --git a/src/workers.h b/src/workers.h index d7d5ac8a..696b72d8 100644 --- a/src/workers.h +++ b/src/workers.h @@ -435,7 +435,7 @@ class KafkaConsumerSeek : public ErrorAwareWorker { class KafkaConsumerConsumeNum : public ErrorAwareWorker { public: KafkaConsumerConsumeNum(Nan::Callback*, NodeKafka::KafkaConsumer*, - const uint32_t &, const int &); + const uint32_t &, const int &, const uint &); ~KafkaConsumerConsumeNum(); void Execute(); @@ -445,6 +445,7 @@ class KafkaConsumerConsumeNum : public ErrorAwareWorker { NodeKafka::KafkaConsumer * m_consumer; const uint32_t m_num_messages; const int m_timeout_ms; + const uint m_total_timeout_ms; std::vector m_messages; };