Skip to content

Stale consumers after commit #69

@cmccarthy1

Description

@cmccarthy1

Describe the bug
Committing an offset as a member of a consumer group during a rebalancing of the group can cause a consumer to become stale.

To Reproduce

cat stale_con.q

\l kfk.q
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ;  }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; .ms.sys.message "offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ; `commit set { } ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data  ; `MSG set x  }
cfg:(!) . flip(
  (`metadata.broker.list;`$"localhost:9092");
  (`bootstrap.servers;`$"localhost:9092");
  (`group.id;`$"test_consumer_group_1");
  (`enable.auto.commit;`false);
  (`enable.auto.offset.store;`false);
  (`auto.offset.reset;`latest);
  (`session.timeout.ms;`60000);
  );
.kfk.Consumer cfg
.kfk.Sub[0i;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ]

cat other__cons.q

\l kfk.q
OFFSET_LOG:() ; MSGS:()
\c 5000 5000
system"sleep 2"
commit:{ .kfk.CommitOffsets[0i;`test1;;1b] exec partition!offset from MSGS where offset = (max;offset)fby partition ; `COMMITED set .z.p ;  }
.kfk.offsetcb: {[cid;err;offsets] if[not err like "Success" ; .ms.sys.message "offsetcb not success" ; OFFSET_LOG,:(cid;err;offsets) ]; }
.kfk.consumecb:{ x[`rcvtime]:.z.p ; MSGS,:: enlist x _ `data  ; `MSG set x ; }
cfg:(!) . flip(
  (`metadata.broker.list;`$"localhost:9092");
  (`bootstrap.servers;`$"localhost:9092");
  (`group.id;`$"test_consumer_group_1");
  (`enable.auto.commit;`false);
  (`enable.auto.offset.store;`false);
  (`auto.offset.reset;`latest);
  (`session.timeout.ms;`60000);
  );
clients:{ .kfk.Consumer cfg } each til 10
{ .kfk.Sub[x;`test1;enlist[.kfk.PARTITION_UA]!enlist[.kfk.OFFSET.END] ] } each clients

Steps:

  1. have a process producing on your topic
  2. start stale_con.q
  3. start other_cons.q once the stale one is up and running
  4. manually run commit[] on stale_con.q process in quick succession.
  5. If “offsetcb not success” is not seen then restart the other_cons.q process and try again
  6. After receiving the "Offset commit failed - Specified group generation id is not valid" from offsetcb the consumer won't consume any more messages.

Expected behavior
Consumer groups should continue to consume messages from an appropriate location following all rebalancing events

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions