-
Notifications
You must be signed in to change notification settings - Fork 32
Long running message processing and rebalances #472
Description
Background:
I have a consumer (RdKafka.KafkaConsumer) that can take up to half an hour to process a single message (though the average is much lower (around 150ms per bulk). I set up my consumer with:
"max.poll.interval.ms": 86400000, // (24 hours),
"session.timeout.ms": 45000,
"enable.auto.offset.store": false,
"enable.auto.commit": true,
"partition.assignment.strategy": "cooperative-sticky", // I tried roundrobin as well,
"rebalance_cb": true
I manually store offsets when the processing of a message bulk is completed.
I'm using this.client.setDefaultConsumeTimeout(0); and I have a loop that calls client.consume(1, (err, messages) => {...}) in a loop until a set bulk size (1,000) is reached (due to #262).
When the bulk size is reached I await the processing of the bulk before entering the consume loop again. This is why I use such a high value for max.poll.interval.ms. Otherwise my consumer would encounter Local: Maximum application poll interval (max.poll.interval.ms) exceeded while it is properly processing messages.
I have 30 processes consuming from a topic with 30 partitions.
Problem:
I was deploying new code (unrelated to this problem) to my consumers using a rolling deployment strategy. During the deployment 2 new (with the new code) consumers got assigned 1 partition each and started working on messages that took a long time to process (220s and 430s). They seem to have missed one rebalance event and didn't rejoin the group like the other 28 new consumers. After they finished processing the messages they rejoined the group, but were assigned all of the partitions (each consumer got 15 partitions). I encountered this problem on every other deploy (likely related to whether a message with long processing time was being consumed at the time).
Is the rebalance process dependent on calling consume? Is this a bug in the library, or is this a necessary evil related to setting max.poll.interval.ms to a high value? I can provide some debug logs if necessary.