Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ public static CoordinatorRecord newConsumerGroupCurrentAssignmentRecord(
.setMemberEpoch(member.memberEpoch())
.setPreviousMemberEpoch(member.previousMemberEpoch())
.setState(member.state().value())
.setAssignedPartitions(toTopicPartitions(member.assignedPartitions()))
.setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocation())),
.setAssignedPartitions(toTopicPartitions(member.assignedPartitionsWithEpochs()))
.setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocationWithEpochs())),
Comment on lines 289 to +293
Copy link

Copilot AI Feb 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newConsumerGroupCurrentAssignmentRecord now exclusively serializes member.assignedPartitionsWithEpochs()/partitionsPendingRevocationWithEpochs(). If those maps are empty while the Set-based assignment fields are populated (possible via existing Builder setters), this will write records with empty assignments and effectively lose the current assignment on disk. Consider adding a defensive fallback (e.g., derive epoch maps from assignedPartitions()/partitionsPendingRevocation() with a sensible default epoch) or validate/throw if the member is internally inconsistent before serializing.

Copilot uses AI. Check for mistakes.
(short) 0
)
);
Expand Down Expand Up @@ -804,13 +804,19 @@ public static CoordinatorRecord newShareGroupStatePartitionMetadataRecord(
}

private static List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> toTopicPartitions(
Map<Uuid, Set<Integer>> topicPartitions
) {
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> topics = new ArrayList<>(topicPartitions.size());
topicPartitions.forEach((topicId, partitions) ->
Map<Uuid, Map<Integer, Integer>> topicPartitionsWithEpochs
) {
List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> topics = new ArrayList<>(topicPartitionsWithEpochs.size());
topicPartitionsWithEpochs.forEach((topicId, partitionEpochMap) -> {
List<Integer> partitionList = new ArrayList<>(partitionEpochMap.keySet());
List<Integer> epochList = partitionList.stream()
.map(partitionId -> partitionEpochMap.getOrDefault(partitionId, 0))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why getOrDefault and not just get

.toList();
topics.add(new ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions()
.setTopicId(topicId)
.setPartitions(new ArrayList<>(partitions)))
.setPartitions(partitionList)
.setAssignmentEpochs(epochList));
}
);
return topics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4138,9 +4138,12 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
ConsumerGroupMember member
) {
// We will write a member epoch of -2 for this departing static member.
// Assignment epochs are reset to 0 so when the static member rejoins, partitions
// are considered assigned from epoch 0 to the new member ID.
ConsumerGroupMember leavingStaticMember = new ConsumerGroupMember.Builder(member)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we also need to update the dynamic member leave code. We seem to set setAssignedPartitions there.

.setMemberEpoch(LEAVE_GROUP_STATIC_MEMBER_EPOCH)
.setPartitionsPendingRevocation(Map.of())
.setPartitionsPendingRevocationWithEpochs(Map.of())
.resetAssignedPartitionsEpochsToZero()
.build();

return new CoordinatorResult<>(
Expand Down Expand Up @@ -5407,8 +5410,8 @@ public void replay(
ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(oldMember)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setAssignedPartitions(Map.of())
.setPartitionsPendingRevocation(Map.of())
.setAssignedPartitionsWithEpochs(Map.of())
.setPartitionsPendingRevocationWithEpochs(Map.of())
.build();
group.updateMember(newMember);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.kafka.coordinator.group.Utils.toOptional;
import static org.apache.kafka.coordinator.group.Utils.toTopicPartitionMap;
Expand Down Expand Up @@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
throw new UnsupportedVersionException("OffsetCommit version 9 or above must be used " +
"by members using the modern group protocol");
}
// For members using the classic protocol, use strict epoch validation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// For members using the classic protocol, use strict epoch validation.
// For members using the classic protocol, use strict epoch validation.

if (member.useClassicProtocol()) {
validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
return CommitPartitionValidator.NO_OP;
}

validateMemberEpoch(memberEpoch, member.memberEpoch(), member.useClassicProtocol());
return CommitPartitionValidator.NO_OP;
// For member using the consumer protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// For member using the consumer protocol
// For members using the consumer protocol, the epoch must either match the last epoch sent
// in a heartbeat or be greater than or equal to the partition's assignment epoch.

// Case 1: Strict epoch match
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do these cases correspond to the KIP? I would drop the comments otherwise.

if (memberEpoch == member.memberEpoch()) {
return CommitPartitionValidator.NO_OP;
}
// Case 2: Client epoch > broker epoch, which is an invalid request
if (memberEpoch > member.memberEpoch()) {
throw new StaleMemberEpochException(String.format("The received member epoch %d is larger than "
+ "the expected member epoch %d.", memberEpoch, member.memberEpoch()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we follow the streams implementation?

Suggested change
throw new StaleMemberEpochException(String.format("The received member epoch %d is larger than "
+ "the expected member epoch %d.", memberEpoch, member.memberEpoch()));
throw new StaleMemberEpochException(String.format("Received member epoch %d is newer than "
+ "current member epoch %d.", memberEpoch, member.memberEpoch()));

}
return createAssignmentEpochValidator(member, memberEpoch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we follow the streams implementation?

Suggested change
return createAssignmentEpochValidator(member, memberEpoch);
// Member epoch is older; validate against per-partition assignment epochs.
return createAssignmentEpochValidator(member, memberEpoch);

}

/**
Expand Down Expand Up @@ -837,6 +852,47 @@ private void validateMemberEpoch(
}
}

/**
* Creates a validator that checks per-partition assignment epochs.
* A commit is rejected if the partition is not assigned to the member
* or if the received client-side epoch is older than the partition's assignment epoch(KIP-1251).
*
* @param member The consumer group member.
* @param receivedMemberEpoch The member epoch from the offset commit request.
* @return A validator that checks each partition's assignment epoch.
*/
private CommitPartitionValidator createAssignmentEpochValidator(
ConsumerGroupMember member,
int receivedMemberEpoch
) {
return (topicName, topicId, partitionId) -> {
// Check if the partition is in the assigned partitions.
// If not found in assigned, check partitions pending revocation.
Integer assignmentEpoch = member.getAssignmentEpoch(topicId, partitionId);
if (assignmentEpoch == null) {
assignmentEpoch = member.getPendingRevocationEpoch(topicId, partitionId);
}

// If client-side epoch != broker-side epoch, and the partition is not assigned to this member, reject.
if (assignmentEpoch == null) {
throw new StaleMemberEpochException(String.format(
"Partition %s-%d is not assigned or pending revocation for member %s. " +
"Committing unassigned partitions is only allowed when member epoch matches exactly " +
"(received: %d, current: %d).",
topicName, partitionId, member.memberId(), receivedMemberEpoch, member.memberEpoch()));
}

// If the received epoch is older than when this partition was assigned,
// It is a zombie commit and should be rejected.
if (receivedMemberEpoch < assignmentEpoch) {
throw new StaleMemberEpochException(
String.format("The received member epoch %d is older than the assignment epoch %d for partition %s-%d.",
receivedMemberEpoch, assignmentEpoch, topicName, partitionId)
);
}
};
}
Comment on lines 856 to 889
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we follow the streams implementation?

Suggested change
/**
* Creates a validator that checks per-partition assignment epochs.
* A commit is rejected if the partition is not assigned to the member
* or if the received client-side epoch is older than the partition's assignment epoch(KIP-1251).
*
* @param member The consumer group member.
* @param receivedMemberEpoch The member epoch from the offset commit request.
* @return A validator that checks each partition's assignment epoch.
*/
private CommitPartitionValidator createAssignmentEpochValidator(
ConsumerGroupMember member,
int receivedMemberEpoch
) {
return (topicName, topicId, partitionId) -> {
// Check if the partition is in the assigned partitions.
// If not found in assigned, check partitions pending revocation.
Integer assignmentEpoch = member.getAssignmentEpoch(topicId, partitionId);
if (assignmentEpoch == null) {
assignmentEpoch = member.getPendingRevocationEpoch(topicId, partitionId);
}
// If client-side epoch != broker-side epoch, and the partition is not assigned to this member, reject.
if (assignmentEpoch == null) {
throw new StaleMemberEpochException(String.format(
"Partition %s-%d is not assigned or pending revocation for member %s. " +
"Committing unassigned partitions is only allowed when member epoch matches exactly " +
"(received: %d, current: %d).",
topicName, partitionId, member.memberId(), receivedMemberEpoch, member.memberEpoch()));
}
// If the received epoch is older than when this partition was assigned,
// It is a zombie commit and should be rejected.
if (receivedMemberEpoch < assignmentEpoch) {
throw new StaleMemberEpochException(
String.format("The received member epoch %d is older than the assignment epoch %d for partition %s-%d.",
receivedMemberEpoch, assignmentEpoch, topicName, partitionId)
);
}
};
}
/**
* Creates a validator that checks if the received member epoch is valid for each partition's assignment epoch.
* A commit is rejected if the partition is not assigned to the member
* or if the received client-side epoch is older than the partition's assignment epoch (KIP-1251).
*
* @param member The member whose assignments are being validated.
* @param receivedMemberEpoch The received member epoch.
* @return A validator for per-partition validation.
*/
private CommitPartitionValidator createAssignmentEpochValidator(
ConsumerGroupMember member,
int receivedMemberEpoch
) {
return (topicName, topicId, partitionId) -> {
// Search for the partition in assigned partitions, then in partitions pending revocation
Integer assignmentEpoch = member.getAssignmentEpoch(topicId, partitionId);
if (assignmentEpoch == null) {
assignmentEpoch = member.getPendingRevocationEpoch(topicId, partitionId);
}
if (assignmentEpoch == null) {
throw new StaleMemberEpochException(String.format(
"Partition %s-%d is not assigned or pending revocation for member.",
topicName, partitionId));
}
if (receivedMemberEpoch < assignmentEpoch) {
throw new StaleMemberEpochException(
String.format("Received member epoch %d is older than assignment epoch %d for partition %s-%d.",
receivedMemberEpoch, assignmentEpoch, topicName, partitionId)
);
}
};
}


/**
* Computes the subscription type based on the provided information.
*
Expand Down Expand Up @@ -1184,17 +1240,24 @@ public static ConsumerGroup fromClassicGroup(
// assignment of the classic group. All the members are put in the Stable state. If the classic
// group was in Preparing Rebalance or Completing Rebalance states, the classic members are
// asked to rejoin the group to re-trigger a rebalance or collect their assignments.
int memberEpoch = classicGroup.generationId();
// Convert assigned partitions to epochs map
Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs = assignedPartitions.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().stream().collect(Collectors.toMap(p -> p, p -> memberEpoch))
));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We could move this into a Builder.setAssignedPartitions(assignedPartitions, assignmentEpoch) convenience method.

ConsumerGroupMember newMember = new ConsumerGroupMember.Builder(classicGroupMember.memberId())
.setMemberEpoch(classicGroup.generationId())
.setMemberEpoch(memberEpoch)
.setState(MemberState.STABLE)
.setPreviousMemberEpoch(classicGroup.generationId())
.setPreviousMemberEpoch(memberEpoch)
.setInstanceId(classicGroupMember.groupInstanceId().orElse(null))
.setRackId(toOptional(subscription.rackId()).orElse(null))
.setRebalanceTimeoutMs(classicGroupMember.rebalanceTimeoutMs())
.setClientId(classicGroupMember.clientId())
.setClientHost(classicGroupMember.clientHost())
.setSubscribedTopicNames(subscription.topics())
.setAssignedPartitions(assignedPartitions)
.setAssignedPartitionsWithEpochs(assignedPartitionsWithEpochs)
.setClassicMemberMetadata(
new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
.setSessionTimeoutMs(classicGroupMember.sessionTimeoutMs())
Expand Down
Loading
Loading