Skip to content

KAFKA-19335: Membership managers send negative epoch in JOINING #19818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 28, 2025

Conversation

lucasbru
Copy link
Member

@lucasbru lucasbru commented May 26, 2025

There is a sequence of interactions with the membership managers of
KIP-848, KIP-932, KIP-1071 that can put the membership manager into
JOINING state, but where member epoch is set to -1. This can result in
an invalid request being sent, since joining heartbeats should not have
member epoch -1. This may lead to the member failing to join. In the
case of streams, the group coordinator will return INVALID_REQUEST.

This is the sequence triggering the bug, which seems to relatively
likely, caused by two heartbeat responses being received after the next
one has been sent.

membershipManager.leaveGroup();
    -> transitions to LEAVING
membershipManager.onHeartbeatRequestGenerated();
    -> transitions to UNSUBSCRIBED
membershipManager.onHeartbeatSuccess(... with member epoch > 0);
    -> unblocks the consumer
membershipManager.onSubscriptionUpdated();
membershipManager.onConsumerPoll();
    -> transitions to JOINING
membershipManager.onHeartbeatSuccess(... with member epoch < 0);
    -> updates the epoch to a negative value

Now we are in state JOINING with memberEpoch=-1, and the next
heartbeat we send will be malformed, triggering INVALID_REQUEST

The bug may also be triggered if the unsubscribe times out, but this
seems more of a corner case.

To prevent the bug, we are taking two measures: The likely path to
triggering the bug can be prevented by not unblocking an unsubscribe
call in the consumer when a non-leave-heartbeat epoch is received. Once
we have sent out leave group heartbeat, we will ignore all heartbeats,
except for those containing memberEpoch < 0.

For extra measure, we also prevent the second case (unsubscribe timing
out). In this case, the consumer gets unblocked before we have received
the leave group heartbeat response, and may resubscribe to the group. In
this case, we shall just ignore the heartbeat response that contains a
member epoch < 0, once it arrives, and we have already left the
UNSUBSCRIBED state.

Reviewers: Lianet Magrans [email protected], Andrew Schofield
[email protected], Shivsundar R [email protected]

@lucasbru lucasbru changed the title KAFKA-19335: Membership Managers end negative epoch in JOINING KAFKA-19335: Membership managers send negative epoch in JOINING May 26, 2025
@lucasbru lucasbru added KIP-1071 PRs related to KIP-1071 KIP-848 The Next Generation of the Consumer Rebalance Protocol KIP-932 Queues for Kafka labels May 26, 2025
@lianetm
Copy link
Member

lianetm commented May 26, 2025

Hey @lucasbru, thanks for taking on this one. Agree with the gap on leave HB responses received in unexpected order. And the fix makes sense to me (only complete the leave if the HB response is a response to leave, and never apply epoch received in a leave HB response).

What I'm not seeing clearly is how this would lead to INVALID_REQUEST? (so worried that even though this is a sensible gap and fix there may still be something else behind the failure you got?). If this race happens, I expect that we end up sending a full HB (all fields), but with the -1 epoch, correct? Then the request should fail with UNKNOWN_MEMBER, the moment the coordinator tries to find the member that wants to leave

StreamsGroupMember member = group.getMemberOrThrow(memberId);

(same applies for the Consumer btw, we should get UNKNOWN_MEMBER if the client sends a full HB to join but with epoch -1 by mistake/race).

Thoughts? not sure if I'm missing something here

@lucasbru
Copy link
Member Author

lucasbru commented May 26, 2025

Thoughts? not sure if I'm missing something here

Thanks for taking a look, @lianetm ! You are right, consumer groups and share groups should fail with UNKNOWN_MEMBER_ID. In streams, we send the topology in a full request, and have code to reject when it is being sent in anything but a joining heartbeat. This will trigger the INVALID_REQUEST. Hope that will clear things up!

https://github.com/apache/kafka/blob/trunk/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java#L539

So indeed this will mean that we get fenced in share/consumer, instead of running into a fatal error as in streams. So it's less of a concern (but I suppose still good to fix) for non-streams groups.

membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());

// Receive a previous heartbeat response, which should be ignored
Copy link
Collaborator

@ShivsundarR ShivsundarR May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This response is not "ignored" right, we are setting the member epoch in the response to membershipManager.memberEpoch (-1 in this case). The leave operation is then processed successfully post which we are asserting if leaveResult has completed.
If this is the case, can we change the comment above to reflect the same. (and in the other tests too).
Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The comment didn't match what was being tested. I updated the test to include both a response that is ignored, and a response that actually unblocks the consumer.

Copy link
Collaborator

@ShivsundarR ShivsundarR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lucasbru for the PR. Just one minor question in the tests, changes look good overall.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Looks good to me from the point of view of share groups.

log.debug("Member {} with epoch {} received a successful response to the heartbeat " +
"to leave the group and completed the leave operation. ", memberId, memberEpoch);
"to leave the group and completed the leave operation. ", memberId, memberEpoch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The indentation of this line is now inconsistent with the rest of the method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@lucasbru
Copy link
Member Author

@ShivsundarR @lianetm ready for re-review

Copy link
Collaborator

@ShivsundarR ShivsundarR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM ! Thanks @lucasbru.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one question on tests, thanks!

Comment on lines +1026 to +1032
membershipManager.onHeartbeatSuccess(new ConsumerGroupHeartbeatResponse(
new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(membershipManager.memberId())
.setMemberEpoch(MEMBER_EPOCH)
));
assertFalse(leaveResult.isDone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the response being ignored is a non-leave response with MEMBER_EPOCH=1, but the test name "IgnoreLeaveResponse" doesn't align right? Clarifying because we could also want to test that a leave response is ignored (if the member already transitioned to JOINING when the leave resp is received)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this line, I am leaving the group. The IgnoreLeaveResponse referes to the fact that I ignore the heartbeat after leaving the group is further down (the second membershipManager.onHeartbeatSuccess(createConsumerGroupLeaveResponse(membershipManager.memberId()));).

Copy link
Member

@lianetm lianetm May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The IgnoreLeaveResponse referes to the fact that I ignore the heartbeat after leaving the group

Ok ok, not leave response (epoch -1), but response after leave, all good. Thanks!

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM

@lucasbru lucasbru merged commit 0c116c9 into apache:trunk May 28, 2025
30 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
clients consumer KIP-848 The Next Generation of the Consumer Rebalance Protocol KIP-932 Queues for Kafka KIP-1071 PRs related to KIP-1071
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants