Skip to content

Kafka consumer reads same message again after consumer group rebalances #1079

Open
@subhamKumar04

Description

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: Mac
  • Node Version [e.g. 8.2.1]: 14.17.5
  • NPM Version [e.g. 5.4.2]: 6.4.1
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]:
  • node-rdkafka version [e.g. 2.3.3]: 2.18.0

Steps to Reproduce

I have two consumers running in my application both running on the same consumer group ID G1.

consumer1 is reading from topic1 and consumer2 is reading from topic2. autoCommit is disabled. I am manually committing each offset after message processing.

Both topics have only one partition.

MAX_MESSAGE_POLL is set to 1. So I am polling only 1 message at a time from the broker.

What I am noticing is, that if consumer1 is in middle of message processing and in the meantime consumer2 comes up. Consumer1 will re-read the same message again. I enabled rebalance_cb in kafka consumer config and got to know that broker rebalances the consumer group whenever a new consumer joins the group. It revokes and re-assigns the same partition again (since there is only one partition)

  • Question 1: Why is consumer1 reading the same message again even after successfully committing the current offset?

Another thing I noticed is that if I provide a default implementation of rebalance_cb from https://www.npmjs.com/package/node-rdkafka#rebalancing

That is the below code:

'rebalance_cb': (err, assignment) => {

      if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
        // Note: this can throw when you are disconnected. Take care and wrap it in
        // a try catch if that matters to you
        this.consumer.assign(assignment);
      } else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS){
        // Same as above
        this.consumer.unassign();
      } else {
        // We had a real error
        console.error(err);
      }
  
    }

In this case, message is NOT read twice.

  • Question 2: What is causing consumer1 to NOT read the same message again when providing a default implementation of rebalance_cb ? I mean without this function also, node-rdkafka must be following same strategy.

My expectation is that after a successful commit, same message should never be read again from same consumer group.

node-rdkafka Configuration Settings

brokerConfig: {
    'metadata.broker.list': 'localhost:9092',
    'group.id': `group-name`,
    'event_cb': true,
    'compression.codec': 'snappy',
    'socket.keepalive.enable': true,
    'enable.auto.commit': false,
    'heartbeat.interval.ms': 250,
    'queued.min.messages': 100,
    'fetch.error.backoff.ms': 250,
    'queued.max.messages.kbytes': 50,
  },
  topicConfig: {
    'auto.offset.reset': 'earliest',
    'request.required.acks': 1
  }

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions