Skip to content

KAFKA-19335: Membership managers send negative epoch in JOINING #19818

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,23 @@ public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponse response) {
"already leaving the group.", memberId, memberEpoch);
return;
}
if (state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) {
if (state == MemberState.UNSUBSCRIBED && responseData.memberEpoch() < 0 && maybeCompleteLeaveInProgress()) {
log.debug("Member {} with epoch {} received a successful response to the heartbeat " +
"to leave the group and completed the leave operation. ", memberId, memberEpoch);
"to leave the group and completed the leave operation. ", memberId, memberEpoch);
return;
}
if (isNotInGroup()) {
log.debug("Ignoring heartbeat response received from broker. Member {} is in {} state" +
" so it's not a member of the group. ", memberId, state);
return;
}
if (responseData.memberEpoch() < 0) {
log.debug("Ignoring heartbeat response received from broker. Member {} with epoch {} " +
"is in {} state and the member epoch is invalid: {}. ", memberId, memberEpoch, state,
responseData.memberEpoch());
maybeCompleteLeaveInProgress();
return;
}

updateMemberEpoch(responseData.memberEpoch());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,23 @@ public void onHeartbeatSuccess(ShareGroupHeartbeatResponse response) {
"already leaving the group.", memberId, memberEpoch);
return;
}
if (state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) {
if (state == MemberState.UNSUBSCRIBED && responseData.memberEpoch() < 0 && maybeCompleteLeaveInProgress()) {
log.debug("Member {} with epoch {} received a successful response to the heartbeat " +
"to leave the group and completed the leave operation. ", memberId, memberEpoch);
"to leave the group and completed the leave operation. ", memberId, memberEpoch);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The indentation of this line is now inconsistent with the rest of the method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

return;
}
if (isNotInGroup()) {
log.debug("Ignoring heartbeat response received from broker. Member {} is in {} state" +
" so it's not a member of the group. ", memberId, state);
return;
}
if (responseData.memberEpoch() < 0) {
log.debug("Ignoring heartbeat response received from broker. Member {} with epoch {} " +
"is in {} state and the member epoch is invalid: {}. ", memberId, memberEpoch, state,
responseData.memberEpoch());
maybeCompleteLeaveInProgress();
return;
}

updateMemberEpoch(responseData.memberEpoch());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ public void onHeartbeatSuccess(StreamsGroupHeartbeatResponse response) {
"already leaving the group.", memberId, memberEpoch);
return;
}
if (state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) {
if (state == MemberState.UNSUBSCRIBED && responseData.memberEpoch() < 0 && maybeCompleteLeaveInProgress()) {
log.debug("Member {} with epoch {} received a successful response to the heartbeat " +
"to leave the group and completed the leave operation. ", memberId, memberEpoch);
return;
Expand All @@ -673,6 +673,13 @@ public void onHeartbeatSuccess(StreamsGroupHeartbeatResponse response) {
" so it's not a member of the group. ", memberId, state);
return;
}
if (responseData.memberEpoch() < 0) {
log.debug("Ignoring heartbeat response received from broker. Member {} with epoch {} " +
"is in {} state and the member epoch is invalid: {}. ", memberId, memberEpoch, state,
responseData.memberEpoch());
maybeCompleteLeaveInProgress();
return;
}

updateMemberEpoch(responseData.memberEpoch());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,10 @@ private void assertTransitionToUnsubscribeOnHBSentAndWaitForResponseToCompleteLe

membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(new Assignment(), membershipManager.memberId()));

assertFalse(sendLeave.isDone(), "Send leave operation should not complete until a leave response is received");

membershipManager.onHeartbeatSuccess(createConsumerGroupLeaveResponse(membershipManager.memberId()));

assertSendLeaveCompleted(membershipManager, sendLeave);
}

Expand Down Expand Up @@ -955,6 +959,9 @@ public void testHeartbeatSuccessfulResponseWhenLeavingGroupCompletesLeave() {
assertFalse(leaveResult.isDone());

membershipManager.onHeartbeatSuccess(createConsumerGroupHeartbeatResponse(createAssignment(true), membershipManager.memberId()));
assertFalse(leaveResult.isDone());

membershipManager.onHeartbeatSuccess(createConsumerGroupLeaveResponse(membershipManager.memberId()));
assertSendLeaveCompleted(membershipManager, leaveResult);
}

Expand Down Expand Up @@ -998,16 +1005,40 @@ public void testIgnoreHeartbeatResponseWhenNotInGroup(MemberState state) {

assertEquals(state, membershipManager.state());
verify(responseData, never()).memberId();
verify(responseData, never()).memberEpoch();
// In unsubscribed, we check if we received a leave group response, so we do verify member epoch.
if (state != MemberState.UNSUBSCRIBED) {
verify(responseData, never()).memberEpoch();
}
verify(responseData, never()).assignment();
}

@Test
public void testLeaveGroupWhenStateIsReconciling() {
ConsumerMembershipManager membershipManager = mockJoinAndReceiveAssignment(false);
assertEquals(MemberState.RECONCILING, membershipManager.state());
public void testIgnoreLeaveResponseWhenNotLeavingGroup() {
ConsumerMembershipManager membershipManager = createMemberInStableState();

testLeaveGroupReleasesAssignmentAndResetsEpochToSendLeaveGroup(membershipManager);
CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();

// Send leave request, transitioning to UNSUBSCRIBED state
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());

// Receive a previous heartbeat response, which should be ignored
membershipManager.onHeartbeatSuccess(new ConsumerGroupHeartbeatResponse(
new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(membershipManager.memberId())
.setMemberEpoch(membershipManager.memberEpoch())
));
assertTrue(leaveResult.isDone());

// Consumer unblocks and updates subscription
membershipManager.onSubscriptionUpdated();
membershipManager.onConsumerPoll();

membershipManager.onHeartbeatSuccess(createConsumerGroupLeaveResponse(membershipManager.memberId()));

assertEquals(MemberState.JOINING, membershipManager.state());
assertEquals(0, membershipManager.memberEpoch());
}

@Test
Expand Down Expand Up @@ -2901,6 +2932,13 @@ private ConsumerGroupHeartbeatResponse createConsumerGroupHeartbeatResponse(
.setAssignment(assignment));
}

private ConsumerGroupHeartbeatResponse createConsumerGroupLeaveResponse(String memberId) {
return new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(memberId)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
}

/**
* Create heartbeat response with the given assignment and a bumped epoch (incrementing by 1
* as default but could be any increment). This will be used to mock when a member
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,10 @@ private void assertTransitionToUnsubscribeOnHBSentAndWaitForResponseToCompleteLe

membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData.Assignment(), membershipManager.memberId()));

assertFalse(sendLeave.isDone(), "Send leave operation should not complete until a leave response is received");

membershipManager.onHeartbeatSuccess(createShareGroupLeaveResponse(membershipManager.memberId()));

assertSendLeaveCompleted(membershipManager, sendLeave);
}

Expand Down Expand Up @@ -518,6 +522,9 @@ public void testHeartbeatSuccessfulResponseWhenLeavingGroupCompletesLeave() {
assertFalse(leaveResult.isDone());

membershipManager.onHeartbeatSuccess(createShareGroupHeartbeatResponse(createAssignment(true), membershipManager.memberId()));
assertFalse(leaveResult.isDone());

membershipManager.onHeartbeatSuccess(createShareGroupLeaveResponse(membershipManager.memberId()));
assertSendLeaveCompleted(membershipManager, leaveResult);
}

Expand Down Expand Up @@ -561,10 +568,42 @@ public void testIgnoreHeartbeatResponseWhenNotInGroup(MemberState state) {

assertEquals(state, membershipManager.state());
verify(responseData, never()).memberId();
verify(responseData, never()).memberEpoch();
// In unsubscribed, we check if we received a leave group response, so we do verify member epoch.
if (state != MemberState.UNSUBSCRIBED) {
verify(responseData, never()).memberEpoch();
}
verify(responseData, never()).assignment();
}

@Test
public void testIgnoreLeaveResponseWhenNotLeavingGroup() {
ShareMembershipManager membershipManager = createMemberInStableState();

CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();

// Send leave request, transitioning to UNSUBSCRIBED state
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());

// Receive a previous heartbeat response, which should be ignored
Copy link
Collaborator

@ShivsundarR ShivsundarR May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This response is not "ignored" right, we are setting the member epoch in the response to membershipManager.memberEpoch (-1 in this case). The leave operation is then processed successfully post which we are asserting if leaveResult has completed.
If this is the case, can we change the comment above to reflect the same. (and in the other tests too).
Thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The comment didn't match what was being tested. I updated the test to include both a response that is ignored, and a response that actually unblocks the consumer.

membershipManager.onHeartbeatSuccess(new ShareGroupHeartbeatResponse(
new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(membershipManager.memberId())
.setMemberEpoch(membershipManager.memberEpoch())
));
assertTrue(leaveResult.isDone());

// Share unblocks and updates subscription
membershipManager.onSubscriptionUpdated();
membershipManager.onConsumerPoll();

membershipManager.onHeartbeatSuccess(createShareGroupLeaveResponse(membershipManager.memberId()));

assertEquals(MemberState.JOINING, membershipManager.state());
assertEquals(0, membershipManager.memberEpoch());
}

@Test
public void testLeaveGroupWhenStateIsReconciling() {
ShareMembershipManager membershipManager = mockJoinAndReceiveAssignment(false);
Expand Down Expand Up @@ -1578,6 +1617,13 @@ private ShareGroupHeartbeatResponse createShareGroupHeartbeatResponse(
.setMemberEpoch(MEMBER_EPOCH)
.setAssignment(assignment));
}

private ShareGroupHeartbeatResponse createShareGroupLeaveResponse(String memberId) {
return new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(memberId)
.setMemberEpoch(ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH));
}

private ShareGroupHeartbeatResponse createShareGroupHeartbeatResponseWithError(String memberId) {
return new ShareGroupHeartbeatResponse(new ShareGroupHeartbeatResponseData()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.COORDINATOR_METRICS_SUFFIX;
import static org.apache.kafka.common.requests.ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -1143,6 +1144,51 @@ public void testLeaveGroupOnCloseWhenNotInGroup() {
testLeaveGroupWhenNotInGroup(membershipManager::leaveGroupOnClose);
}

@Test
public void testIgnoreLeaveResponseWhenNotLeavingGroup() {
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
final Set<StreamsRebalanceData.TaskId> activeTasks =
Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
joining();
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
final CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
acknowledging(onTasksAssignedCallbackExecutedSetup);
stable();

CompletableFuture<Void> leaveResult = membershipManager.leaveGroup();
final CompletableFuture<Void> onTasksRevokedCallbackExecutedSetup =
verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks);
onTasksRevokedCallbackExecutedSetup.complete(null);

// Send leave request, transitioning to UNSUBSCRIBED state
membershipManager.onHeartbeatRequestGenerated();
assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state());

// Receive a previous heartbeat response, which should be ignored
membershipManager.onHeartbeatSuccess(new StreamsGroupHeartbeatResponse(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(membershipManager.memberId())
.setMemberEpoch(membershipManager.memberEpoch())
));
assertTrue(leaveResult.isDone());

// Consumer unblocks and updates subscription
membershipManager.onSubscriptionUpdated();
membershipManager.onConsumerPoll();

membershipManager.onHeartbeatSuccess(new StreamsGroupHeartbeatResponse(
new StreamsGroupHeartbeatResponseData()
.setErrorCode(Errors.NONE.code())
.setMemberId(membershipManager.memberId())
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
));

assertEquals(MemberState.JOINING, membershipManager.state());
assertEquals(0, membershipManager.memberEpoch());
}

private void testLeaveGroupWhenNotInGroup(final Supplier<CompletableFuture<Void>> leaveGroup) {
final CompletableFuture<Void> future = leaveGroup.get();

Expand Down Expand Up @@ -1216,10 +1262,18 @@ public void testLeaveGroupWhenInGroupWithAssignment() {
assertEquals(onGroupLeft, onGroupLeftAfterRevocationCallback);
membershipManager.onHeartbeatRequestGenerated();
verifyInStateUnsubscribed(membershipManager);

// Don't unblock unsubscribe if this is not a leave group response
membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0), MEMBER_EPOCH + 1));

assertFalse(onGroupLeft.isDone());
verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId());

// Unblock unsubscribe when this is not a leave group response
membershipManager.onHeartbeatSuccess(makeHeartbeatResponse(List.of(), List.of(), List.of(), LEAVE_GROUP_MEMBER_EPOCH));

assertTrue(onGroupLeft.isDone());
assertFalse(onGroupLeft.isCompletedExceptionally());
verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId());
}

@Test
Expand Down Expand Up @@ -1252,10 +1306,18 @@ public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
assertFalse(onGroupLeft.isDone());
membershipManager.onHeartbeatRequestGenerated();
verifyInStateUnsubscribed(membershipManager);

// Don't unblock unsubscribe if this is not a leave group response
membershipManager.onHeartbeatSuccess(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0), MEMBER_EPOCH + 1));

assertFalse(onGroupLeft.isDone());
verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId());

// Unblock unsubscribe when this is not a leave group response
membershipManager.onHeartbeatSuccess(makeHeartbeatResponse(List.of(), List.of(), List.of(), LEAVE_GROUP_MEMBER_EPOCH));

assertTrue(onGroupLeft.isDone());
assertFalse(onGroupLeft.isCompletedExceptionally());
verify(memberStateListener, never()).onMemberEpochUpdated(Optional.of(MEMBER_EPOCH + 1), membershipManager.memberId());
}

@Test
Expand Down