From 3938300c2e1bd99358a68e18a0c9707bcf588c67 Mon Sep 17 00:00:00 2001 From: Bojan Rajkovic Date: Wed, 6 Dec 2023 15:14:03 -0500 Subject: [PATCH 1/9] workers: add ability to set a total consume operation timeout to the consumenum worker the consumenum worker consumes up to a certain # of messages, and only applies the configured timeout to a per-message consumption, breaking out of the consume loop when: 1. the end of the partition is reached (if partition eof messages are enabled) 2. a consume times out 3. the batch size is filled this leaves the worker susceptible to slow-loris type issues, where messages come in at a cadence *just* under the timeout (e.g. every 900 milliseconds) -- with a batch size of 1000, that means that `consumer.consume(1000, 1000, cb)` could take as many as 900 seconds to complete, which is less than ideal. introduce a configuration in the consumenum worker that allows callers to specify a "total" timeout that gets applied to the entire operation -- when exceeded, we break out of the loop and return the batch, no matter how many messages were read. --- src/workers.cc | 48 ++++++++++++++++++++++++++++++++++++++++++------ src/workers.h | 3 ++- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/src/workers.cc b/src/workers.cc index 55d3dd50..1ddfe441 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -17,11 +17,30 @@ #else // Windows specific #include +#include #endif using NodeKafka::Producer; using NodeKafka::Connection; +#ifndef _WIN32 +time_t get_millisecond_timestamp() { + // Get a precise timer from the monotonic clock. + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + + // Convert it to milliseconds by multiplying the seconds component by 10e3 and + // dividing the nanoseconds component by 10e6 + return (ts.tv_sec * 1000 + ts.tv_nsec / 1000000); +} +#else +time_t get_millisecond_timestamp() { + // Just call GetTickCount64. We could use QueryHighPerformanceCounter, but we + // don't need that level of precision. + return GetTickCount64() +} +#endif + namespace NodeKafka { namespace Workers { @@ -797,11 +816,13 @@ void KafkaConsumerConsumeLoop::HandleErrorCallback() { KafkaConsumerConsumeNum::KafkaConsumerConsumeNum(Nan::Callback *callback, KafkaConsumer* consumer, const uint32_t & num_messages, - const int & timeout_ms) : + const int & timeout_ms, + const uint & total_timeout_ms) : ErrorAwareWorker(callback), m_consumer(consumer), m_num_messages(num_messages), - m_timeout_ms(timeout_ms) {} + m_timeout_ms(timeout_ms), + m_total_timeout_ms(total_timeout_ms) {} KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {} @@ -809,14 +830,29 @@ void KafkaConsumerConsumeNum::Execute() { std::size_t max = static_cast(m_num_messages); bool looping = true; int timeout_ms = m_timeout_ms; + uint total_timeout_ms = m_total_timeout_ms; + uint64_t start = get_millisecond_timestamp(); std::size_t eof_event_count = 0; while (m_messages.size() - eof_event_count < max && looping) { // Get a message + Baton b = m_consumer->Consume(timeout_ms); if (b.err() == RdKafka::ERR_NO_ERROR) { RdKafka::Message *message = b.data(); RdKafka::ErrorCode errorCode = message->err(); + + // Check how much time has passed since we started this consume loop — if + // it's more than the total timeout in milliseconds (if one is configured + // and is greater than 0ms — the default is -1, to avoid breaking + // behavioral changes), stop the loop regardless, after processing the + // error code. + uint64_t current = get_millisecond_timestamp(); + uint64_t elapsed = current - start; + if (elapsed > total_timeout_ms && total_timeout_ms > 0) { + looping = false; + } + switch (errorCode) { case RdKafka::ERR__PARTITION_EOF: // If partition EOF and have consumed messages, retry with timeout 1 @@ -824,7 +860,7 @@ void KafkaConsumerConsumeNum::Execute() { if (m_messages.size() > eof_event_count) { timeout_ms = 1; } - + // We will only go into this code path when `enable.partition.eof` is set to true // In this case, consumer is also interested in EOF messages, so we return an EOF message m_messages.push_back(message); @@ -872,7 +908,7 @@ void KafkaConsumerConsumeNum::HandleOKCallback() { for (std::vector::iterator it = m_messages.begin(); it != m_messages.end(); ++it) { RdKafka::Message* message = *it; - + switch (message->err()) { case RdKafka::ERR_NO_ERROR: ++returnArrayIndex; @@ -890,7 +926,7 @@ void KafkaConsumerConsumeNum::HandleOKCallback() { Nan::New(message->offset())); Nan::Set(eofEvent, Nan::New("partition").ToLocalChecked(), Nan::New(message->partition())); - + // also store index at which position in the message array this event was emitted // this way, we can later emit it at the right point in time Nan::Set(eofEvent, Nan::New("messageIndex").ToLocalChecked(), @@ -898,7 +934,7 @@ void KafkaConsumerConsumeNum::HandleOKCallback() { Nan::Set(eofEventsArray, eofEventsArrayIndex, eofEvent); } - + delete message; } } diff --git a/src/workers.h b/src/workers.h index d7d5ac8a..696b72d8 100644 --- a/src/workers.h +++ b/src/workers.h @@ -435,7 +435,7 @@ class KafkaConsumerSeek : public ErrorAwareWorker { class KafkaConsumerConsumeNum : public ErrorAwareWorker { public: KafkaConsumerConsumeNum(Nan::Callback*, NodeKafka::KafkaConsumer*, - const uint32_t &, const int &); + const uint32_t &, const int &, const uint &); ~KafkaConsumerConsumeNum(); void Execute(); @@ -445,6 +445,7 @@ class KafkaConsumerConsumeNum : public ErrorAwareWorker { NodeKafka::KafkaConsumer * m_consumer; const uint32_t m_num_messages; const int m_timeout_ms; + const uint m_total_timeout_ms; std::vector m_messages; }; From 79b4bdcef7471e97cbfabdb73cedade60f8e9b00 Mon Sep 17 00:00:00 2001 From: Bojan Rajkovic Date: Wed, 6 Dec 2023 15:17:53 -0500 Subject: [PATCH 2/9] consumer: implement new total consume operation timeout argument in c++-land this builds on the previous commit by: 1. implementing the total consume timeout parameter on the consume method's binding 2. cleaning up the argument conversions a little bit to make them cleaner to read -- instead of deciding whether we're in the two-argument consume(timeout, cb) variant or the newly-4-argument consume(timeout, totalTimeout, num, cb) variant based on the type of the 2nd argument, it instead looks at the # of arguments --- src/kafka-consumer.cc | 52 +++++++++++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 019b0cb6..34afd57d 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -1060,7 +1060,7 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) { return Nan::ThrowError("Need to specify a sleep delay"); } - if (!info[2]->IsFunction()) { + if (!info[3]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); } @@ -1107,10 +1107,11 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { Nan::HandleScope scope; if (info.Length() < 2) { - // Just throw an exception + // Just throw an exception — we didn't get enough arguments return Nan::ThrowError("Invalid number of parameters"); } + // The first argument should always be the timeout. int timeout_ms; Nan::Maybe maybeTimeout = Nan::To(info[0].As()); @@ -1121,37 +1122,54 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { timeout_ms = static_cast(maybeTimeout.FromJust()); } - if (info[1]->IsNumber()) { - if (!info[2]->IsFunction()) { - return Nan::ThrowError("Need to specify a callback"); + if (info.Length() == 4) { + // The 4-argument variant means that we're consuming a specific # of + // messages. Parse not just the number of messages, but also the total + // timeout in the 2nd argument — the number of messages is the 3rd argument, + // and the callback is the 4th. + + // Parse the total timeout — how long to wait + uint total_timeout_ms; + Nan::Maybe maybeTotalTimeout = + Nan::To(info[1].As()); + if (maybeTotalTimeout.IsNothing()) { + total_timeout_ms = -1; + } else { + timeout_ms = static_cast(maybeTotalTimeout.FromJust()); } - v8::Local numMessagesNumber = info[1].As(); + // Parse the # of messages to read + v8::Local numMessagesNumber = info[2].As(); Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT - uint32_t numMessages; if (numMessagesMaybe.IsNothing()) { - return Nan::ThrowError("Parameter must be a number over 0"); + return Nan::ThrowError("Number of messages to consume must be a number over 0"); } else { numMessages = numMessagesMaybe.FromJust(); } - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); - - v8::Local cb = info[2].As(); + // Check that the callback is configured properly + if (!info[3]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); + } + v8::Local cb = info[3].As(); Nan::Callback *callback = new Nan::Callback(cb); - Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms)); // NOLINT + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, total_timeout_ms)); // NOLINT } else { + // The 2-argument variant means that we're just consuming for a specified + // timeout, no message counter here — check the 2nd argument to see if it's + // a callback, and that's it. + if (!info[1]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); } - - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); - v8::Local cb = info[1].As(); Nan::Callback *callback = new Nan::Callback(cb); + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); Nan::AsyncQueueWorker( new Workers::KafkaConsumerConsume(callback, consumer, timeout_ms)); } @@ -1195,7 +1213,7 @@ NAN_METHOD(KafkaConsumer::NodeDisconnect) { // cleanup the async worker consumeLoop->WorkComplete(); consumeLoop->Destroy(); - + consumer->m_consume_loop = nullptr; } From 3d9f3658684954289ce2b29307d7c8ebd5c60f78 Mon Sep 17 00:00:00 2001 From: Bojan Rajkovic Date: Wed, 6 Dec 2023 15:22:30 -0500 Subject: [PATCH 3/9] consumer: thread through total consume timeout to js-land builds on top of the previous two commits and threads the total consume timeout value into javascript-land. the default is `undefined`, as that will result in no behavioral changes for consumers. the new behavior is thus entirely opt-in. --- lib/kafka-consumer.js | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index c479240f..824fdced 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -132,6 +132,7 @@ function KafkaConsumer(conf, topicConf) { this.topicConfig = topicConf; this._consumeTimeout = DEFAULT_CONSUME_TIME_OUT; + this._consumeTotalTimeout = undefined; this._consumeLoopTimeoutDelay = DEFAULT_CONSUME_LOOP_TIMEOUT_DELAY; } @@ -143,6 +144,15 @@ KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) { this._consumeTimeout = timeoutMs; }; +/** + * Set the default total consume operation timeout provided to c++land + * @param {number} totalTimeoutMs - maximum number of milliseconds to allow a + * batch consume operation to run + */ +KafkaConsumer.prototype.setDefaultTotalConsumeTimeout = function(timeoutMs) { + this._consumeTotalTimeout = totalTimeoutMs; +}; + /** * Set the default sleep delay for the next consume loop after the previous one has timed out. * @param {number} intervalMs - number of milliseconds to sleep after a message fetch has timed out @@ -386,6 +396,11 @@ KafkaConsumer.prototype.unsubscribe = function() { */ KafkaConsumer.prototype.consume = function(number, cb) { var timeoutMs = this._consumeTimeout !== undefined ? this._consumeTimeout : DEFAULT_CONSUME_TIME_OUT; + + // Undefined is OK here — the default is to not upper bound the consume call's + // total time spent, to avoid breaking changes. + var totalTimeoutMs = this._consumeTotalTimeout + var self = this; if ((number && typeof number === 'number') || (number && cb)) { @@ -396,7 +411,7 @@ KafkaConsumer.prototype.consume = function(number, cb) { throw new TypeError('Callback must be a function'); } - this._consumeNum(timeoutMs, number, cb); + this._consumeNum(timeoutMs, totalTimeoutMs, number, cb); } else { // See https://github.com/Blizzard/node-rdkafka/issues/220 From 984cd883bc27e1fa7b4e122e38952872ab30e11f Mon Sep 17 00:00:00 2001 From: Bojan Rajkovic Date: Wed, 6 Dec 2023 15:44:50 -0500 Subject: [PATCH 4/9] test: add new tests for the global consume timeout set and unset variants --- e2e/both.spec.js | 98 ++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 95 insertions(+), 3 deletions(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index a8289ec3..3092dade 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -163,7 +163,7 @@ describe('Consumer/Producer', function() { }); }); - + it('should return ready messages on partition EOF', function(done) { crypto.randomBytes(4096, function(ex, buffer) { producer.setPollInterval(10); @@ -221,7 +221,7 @@ describe('Consumer/Producer', function() { consumer.once('data', function(msg) { events.push("data"); }); - + consumer.once('partition.eof', function(eof) { events.push("partition.eof"); }); @@ -272,6 +272,98 @@ describe('Consumer/Producer', function() { }); }); + it('should fill a batch when there is no total consume timeout set', function(done) { + crypto.randomBytes(4096, function(ex, buffer) { + producer.setPollInterval(10); + + producer.once('delivery-report', function(err, report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + var events = []; + + consumer.once('data', function(msg) { + events.push("data"); + }); + + consumer.on('partition.eof', function(eof) { + events.push("partition.eof"); + }); + + // Produce 10 messages, 500ms apart — the whole batch should fill, but the + // time taken for the consume call should be >=5000ms. + + let timeoutId; + let toProduce = 10; + produceLoop = () => { + producer.produce(topic, null, buffer, null); + if (--toProduce > 0) { + timeoutId = setTimeout(produceLoop, 500); + } + }; + produceLoop(); + + consumer.setDefaultConsumeTimeout(1000); + const startedAt = Date.now(); + consumer.consume(10, function(err, messages) { + t.ifError(err); + t(messages.length < 10, 'Too few messages consumed within batch'); + t(Date.now() - started < 5000, 'Consume finished too fast, should have taken at least 5 seconds') + clearTimeout(timeoutId); + done(); + }); + }); + }); + + it('should not fill a batch when there is a total consume timeout set', function(done) { + crypto.randomBytes(4096, function(ex, buffer) { + producer.setPollInterval(10); + + producer.once('delivery-report', function(err, report) { + t.ifError(err); + }); + + consumer.subscribe([topic]); + + var events = []; + + consumer.once('data', function(msg) { + events.push("data"); + }); + + consumer.on('partition.eof', function(eof) { + events.push("partition.eof"); + }); + + // Produce 20 messages, 900ms apart — the whole batch should *not* fill, + // we should only get 11 messages (11*900 = 9900, 9900 < 10000). + + let timeoutId; + let toProduce = 20; + produceLoop = () => { + producer.produce(topic, null, buffer, null); + if (--toProduce > 0) { + timeoutId = setTimeout(produceLoop, 900); + } + }; + produceLoop(); + + consumer.setDefaultConsumeTimeout(1000); + consumer.setDefaultTotalConsumeTimeout(10000) + const startedAt = Date.now(); + consumer.consume(100, function(err, messages) { + t.ifError(err); + t(messages.length < 11, 'Too few messages consumed within batch'); + t(messages.length > 11, 'Too many messages consumed within batch'); + t(Date.now() - started > 10000, 'Consume took longer than global timeout'); + clearTimeout(timeoutId); + done(); + }); + }); + }); + it('should be able to produce and consume messages: consumeLoop', function(done) { var key = 'key'; @@ -409,7 +501,7 @@ describe('Consumer/Producer', function() { ]; run_headers_test(done, headers); }); - + it('should be able to produce and consume messages with multiple headers value as string: consumeLoop', function(done) { var headers = [ { key1: 'value1' }, From 202305cd6e72620bfb11ca09b1de2b39443b00ef Mon Sep 17 00:00:00 2001 From: Bojan Rajkovic Date: Wed, 6 Dec 2023 16:18:18 -0500 Subject: [PATCH 5/9] types: fix consume(cb) type definition to make it clear it'll flow a single message --- index.d.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.d.ts b/index.d.ts index d7ce7e61..173e3505 100644 --- a/index.d.ts +++ b/index.d.ts @@ -220,7 +220,7 @@ export class KafkaConsumer extends Client { committed(timeout: number, cb: (err: LibrdKafkaError, topicPartitions: TopicPartitionOffset[]) => void): this; consume(number: number, cb?: (err: LibrdKafkaError, messages: Message[]) => void): void; - consume(cb: (err: LibrdKafkaError, messages: Message[]) => void): void; + consume(cb: (err: LibrdKafkaError, messages: Message) => void): void; consume(): void; getWatermarkOffsets(topic: string, partition: number): WatermarkOffsets; From 7c66aa9ae91a86bccf32a4d0deb3a5ba0b879625 Mon Sep 17 00:00:00 2001 From: Bojan Rajkovic Date: Wed, 6 Dec 2023 21:45:26 -0500 Subject: [PATCH 6/9] tests: fix a few typos and wrong assertions --- e2e/both.spec.js | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 3092dade..3648b414 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -306,10 +306,10 @@ describe('Consumer/Producer', function() { produceLoop(); consumer.setDefaultConsumeTimeout(1000); - const startedAt = Date.now(); + const started = Date.now(); consumer.consume(10, function(err, messages) { t.ifError(err); - t(messages.length < 10, 'Too few messages consumed within batch'); + t(messages.length >= 10, 'Too few messages consumed within batch'); t(Date.now() - started < 5000, 'Consume finished too fast, should have taken at least 5 seconds') clearTimeout(timeoutId); done(); @@ -351,13 +351,15 @@ describe('Consumer/Producer', function() { produceLoop(); consumer.setDefaultConsumeTimeout(1000); - consumer.setDefaultTotalConsumeTimeout(10000) - const startedAt = Date.now(); + consumer.setDefaultTotalConsumeTimeout(10000); + const started = Date.now(); consumer.consume(100, function(err, messages) { t.ifError(err); - t(messages.length < 11, 'Too few messages consumed within batch'); - t(messages.length > 11, 'Too many messages consumed within batch'); - t(Date.now() - started > 10000, 'Consume took longer than global timeout'); + // Why 13? First message is produced immediately, then 11 more are + // produced over 990 seconds, and then a 13th is produced and consumed + // in the "final" loop where elapsed time > 10,000. + t(messages.length == 13, 'Batch should have consumed 13 messages, instead consumed ' + messages.length + ' messages'); + t(Date.now() - started > 10000 + 1000, 'Consume took longer than global timeout + single message timeout'); clearTimeout(timeoutId); done(); }); From 6d4602e1cd6d489650151b39cf1482004ed67ed6 Mon Sep 17 00:00:00 2001 From: Bojan Rajkovic Date: Wed, 6 Dec 2023 21:45:58 -0500 Subject: [PATCH 7/9] fix: properly pass total timeout to c++land --- index.d.ts | 2 ++ lib/kafka-consumer.js | 11 ++++------- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/index.d.ts b/index.d.ts index 173e3505..a13225ad 100644 --- a/index.d.ts +++ b/index.d.ts @@ -237,6 +237,8 @@ export class KafkaConsumer extends Client { setDefaultConsumeTimeout(timeoutMs: number): void; + setDefaultTotalConsumeTimeout(totalTimeoutMs: number): void + setDefaultConsumeLoopTimeoutDelay(timeoutMs: number): void; subscribe(topics: SubscribeTopicList): this; diff --git a/lib/kafka-consumer.js b/lib/kafka-consumer.js index 824fdced..4faad5c7 100644 --- a/lib/kafka-consumer.js +++ b/lib/kafka-consumer.js @@ -149,7 +149,7 @@ KafkaConsumer.prototype.setDefaultConsumeTimeout = function(timeoutMs) { * @param {number} totalTimeoutMs - maximum number of milliseconds to allow a * batch consume operation to run */ -KafkaConsumer.prototype.setDefaultTotalConsumeTimeout = function(timeoutMs) { +KafkaConsumer.prototype.setDefaultTotalConsumeTimeout = function(totalTimeoutMs) { this._consumeTotalTimeout = totalTimeoutMs; }; @@ -401,10 +401,7 @@ KafkaConsumer.prototype.consume = function(number, cb) { // total time spent, to avoid breaking changes. var totalTimeoutMs = this._consumeTotalTimeout - var self = this; - if ((number && typeof number === 'number') || (number && cb)) { - if (cb === undefined) { cb = function() {}; } else if (typeof cb !== 'function') { @@ -413,7 +410,6 @@ KafkaConsumer.prototype.consume = function(number, cb) { this._consumeNum(timeoutMs, totalTimeoutMs, number, cb); } else { - // See https://github.com/Blizzard/node-rdkafka/issues/220 // Docs specify just a callback can be provided but really we needed // a fallback to the number argument @@ -441,6 +437,7 @@ KafkaConsumer.prototype.consume = function(number, cb) { KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) { var self = this; var retryReadInterval = this._consumeLoopTimeoutDelay; + self._client.consumeLoop(timeoutMs, retryReadInterval, function readCallback(err, message, eofEvent, warning) { if (err) { @@ -475,10 +472,10 @@ KafkaConsumer.prototype._consumeLoop = function(timeoutMs, cb) { * @private * @see consume */ -KafkaConsumer.prototype._consumeNum = function(timeoutMs, numMessages, cb) { +KafkaConsumer.prototype._consumeNum = function(timeoutMs, totalTimeoutMs, numMessages, cb) { var self = this; - this._client.consume(timeoutMs, numMessages, function(err, messages, eofEvents) { + this._client.consume(timeoutMs, totalTimeoutMs, numMessages, function(err, messages, eofEvents) { if (err) { err = LibrdKafkaError.create(err); if (cb) { From f37dfa6a7ea757a677b70b4f5f9e986b9542ce42 Mon Sep 17 00:00:00 2001 From: Bojan Rajkovic Date: Wed, 6 Dec 2023 21:46:26 -0500 Subject: [PATCH 8/9] refactor: cleanu up some dead code in c++, fix consumeloop argument checks --- src/kafka-consumer.cc | 81 ++++++++++++++++--------------------------- 1 file changed, 30 insertions(+), 51 deletions(-) diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 34afd57d..4f971c1f 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -1049,7 +1049,7 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) { if (info.Length() < 3) { // Just throw an exception - return Nan::ThrowError("Invalid number of parameters"); + return Nan::ThrowError("Invalid number of parameters, expected 3"); } if (!info[0]->IsNumber()) { @@ -1060,7 +1060,7 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) { return Nan::ThrowError("Need to specify a sleep delay"); } - if (!info[3]->IsFunction()) { + if (!info[2]->IsFunction()) { return Nan::ThrowError("Need to specify a callback"); } @@ -1106,9 +1106,9 @@ NAN_METHOD(KafkaConsumer::NodeConsumeLoop) { NAN_METHOD(KafkaConsumer::NodeConsume) { Nan::HandleScope scope; - if (info.Length() < 2) { + if (info.Length() != 4) { // Just throw an exception — we didn't get enough arguments - return Nan::ThrowError("Invalid number of parameters"); + return Nan::ThrowError("Invalid number of arguments, expected 4"); } // The first argument should always be the timeout. @@ -1122,57 +1122,36 @@ NAN_METHOD(KafkaConsumer::NodeConsume) { timeout_ms = static_cast(maybeTimeout.FromJust()); } - if (info.Length() == 4) { - // The 4-argument variant means that we're consuming a specific # of - // messages. Parse not just the number of messages, but also the total - // timeout in the 2nd argument — the number of messages is the 3rd argument, - // and the callback is the 4th. - - // Parse the total timeout — how long to wait - uint total_timeout_ms; - Nan::Maybe maybeTotalTimeout = - Nan::To(info[1].As()); - if (maybeTotalTimeout.IsNothing()) { - total_timeout_ms = -1; - } else { - timeout_ms = static_cast(maybeTotalTimeout.FromJust()); - } - - // Parse the # of messages to read - v8::Local numMessagesNumber = info[2].As(); - Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT - uint32_t numMessages; - if (numMessagesMaybe.IsNothing()) { - return Nan::ThrowError("Number of messages to consume must be a number over 0"); - } else { - numMessages = numMessagesMaybe.FromJust(); - } - - // Check that the callback is configured properly - if (!info[3]->IsFunction()) { - return Nan::ThrowError("Need to specify a callback"); - } - v8::Local cb = info[3].As(); - Nan::Callback *callback = new Nan::Callback(cb); - - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); - Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, total_timeout_ms)); // NOLINT + // Parse the total timeout — how long to wait + uint total_timeout_ms; + Nan::Maybe maybeTotalTimeout = + Nan::To(info[1].As()); + if (maybeTotalTimeout.IsNothing()) { + total_timeout_ms = -1; } else { - // The 2-argument variant means that we're just consuming for a specified - // timeout, no message counter here — check the 2nd argument to see if it's - // a callback, and that's it. + total_timeout_ms = static_cast(maybeTotalTimeout.FromJust()); + } - if (!info[1]->IsFunction()) { - return Nan::ThrowError("Need to specify a callback"); - } - v8::Local cb = info[1].As(); - Nan::Callback *callback = new Nan::Callback(cb); + // Parse the # of messages to read + v8::Local numMessagesNumber = info[2].As(); + Nan::Maybe numMessagesMaybe = Nan::To(numMessagesNumber); // NOLINT + uint32_t numMessages; + if (numMessagesMaybe.IsNothing()) { + return Nan::ThrowError("Number of messages to consume must be a number over 0"); + } else { + numMessages = numMessagesMaybe.FromJust(); + } - KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); - Nan::AsyncQueueWorker( - new Workers::KafkaConsumerConsume(callback, consumer, timeout_ms)); + // Check that the callback is configured properly + if (!info[3]->IsFunction()) { + return Nan::ThrowError("Need to specify a callback"); } + v8::Local cb = info[3].As(); + Nan::Callback *callback = new Nan::Callback(cb); + + KafkaConsumer* consumer = ObjectWrap::Unwrap(info.This()); + Nan::AsyncQueueWorker( + new Workers::KafkaConsumerConsumeNum(callback, consumer, numMessages, timeout_ms, total_timeout_ms)); // NOLINT info.GetReturnValue().Set(Nan::Null()); } From d9c3d3bfecf235be2e7ed0a04d7a539992c389e4 Mon Sep 17 00:00:00 2001 From: Bojan Rajkovic Date: Wed, 6 Dec 2023 21:54:23 -0500 Subject: [PATCH 9/9] tests: fix assertion for partial batch timing --- e2e/both.spec.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/e2e/both.spec.js b/e2e/both.spec.js index 3648b414..68f13f8b 100644 --- a/e2e/both.spec.js +++ b/e2e/both.spec.js @@ -359,7 +359,7 @@ describe('Consumer/Producer', function() { // produced over 990 seconds, and then a 13th is produced and consumed // in the "final" loop where elapsed time > 10,000. t(messages.length == 13, 'Batch should have consumed 13 messages, instead consumed ' + messages.length + ' messages'); - t(Date.now() - started > 10000 + 1000, 'Consume took longer than global timeout + single message timeout'); + t((Date.now() - started) < (10000 + 1000), 'Consume took ' + (Date.now() - started) + ', longer than global timeout + single message timeout of ' + (10000 + 1000)); clearTimeout(timeoutId); done(); });