KAFKA-20066: Assignment epochs for consumer groups#21508
KAFKA-20066: Assignment epochs for consumer groups#21508lucliu1108 wants to merge 8 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Implements KIP-1251 assignment epochs for modern consumer groups to relax offset-commit fencing by validating commits against per-partition assignment epochs rather than requiring a strict member-epoch match.
Changes:
- Extends the
ConsumerGroupCurrentMemberAssignmentValueschema to persist per-partition assignment epochs alongside assigned partitions. - Adds epoch-annotated assignment tracking to
ConsumerGroupMemberand serializes it viaGroupCoordinatorRecordHelpers. - Updates
ConsumerGroup.validateOffsetCommit()to allowassignmentEpoch <= clientEpoch <= brokerEpochusing a per-partition validator when epochs don’t strictly match.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json | Adds per-partition AssignmentEpochs field aligned with the partitions array. |
| group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java | Introduces epoch-annotated assignment maps + accessors and builder support (incl. reset to zero). |
| group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java | Relaxes offset-commit epoch validation and adds per-partition assignment-epoch validator. |
| group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java | Resets assignment epochs for departing static members and clears pending-revocation epochs. |
| group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java | Serializes assignment epochs into coordinator records. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public Builder setAssignedPartitionsWithEpochs(Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs) { | ||
| this.assignedPartitionsWithEpochs = assignedPartitionsWithEpochs; | ||
| this.assignedPartitions = assignedPartitionsWithEpochs.entrySet().stream() | ||
| .collect(Collectors.toMap( | ||
| Map.Entry::getKey, | ||
| e -> new HashSet<>(e.getValue().keySet()) | ||
| )); | ||
| return this; | ||
| } | ||
|
|
||
| public Builder setPartitionsPendingRevocationWithEpochs(Map<Uuid, Map<Integer, Integer>> partitionsPendingRevocationWithEpochs) { | ||
| this.partitionsPendingRevocationWithEpochs = partitionsPendingRevocationWithEpochs; | ||
| this.partitionsPendingRevocation = partitionsPendingRevocationWithEpochs.entrySet().stream() | ||
| .collect(Collectors.toMap( | ||
| Map.Entry::getKey, | ||
| e -> new HashSet<>(e.getValue().keySet()) | ||
| )); | ||
| return this; | ||
| } |
There was a problem hiding this comment.
setAssignedPartitionsWithEpochs / setPartitionsPendingRevocationWithEpochs keep the Set-based fields in sync, but the Builder still also has the older setAssignedPartitions(...) and setPartitionsPendingRevocation(...) setters. Given GroupCoordinatorRecordHelpers now serializes assignedPartitionsWithEpochs()/partitionsPendingRevocationWithEpochs(), any code path that uses the older setters (without also setting the epoch maps) can drop epoch information and even persist empty assignments. Consider enforcing invariants in build() (e.g., derive epoch maps when missing), or deprecating/removing the Set-based setters to prevent constructing an internally inconsistent ConsumerGroupMember.
| .setMemberEpoch(member.memberEpoch()) | ||
| .setPreviousMemberEpoch(member.previousMemberEpoch()) | ||
| .setState(member.state().value()) | ||
| .setAssignedPartitions(toTopicPartitions(member.assignedPartitions())) | ||
| .setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocation())), | ||
| .setAssignedPartitions(toTopicPartitions(member.assignedPartitionsWithEpochs())) | ||
| .setPartitionsPendingRevocation(toTopicPartitions(member.partitionsPendingRevocationWithEpochs())), |
There was a problem hiding this comment.
newConsumerGroupCurrentAssignmentRecord now exclusively serializes member.assignedPartitionsWithEpochs()/partitionsPendingRevocationWithEpochs(). If those maps are empty while the Set-based assignment fields are populated (possible via existing Builder setters), this will write records with empty assignments and effectively lose the current assignment on disk. Consider adding a defensive fallback (e.g., derive epoch maps from assignedPartitions()/partitionsPendingRevocation() with a sensible default epoch) or validate/throw if the member is internally inconsistent before serializing.
| // For member using the classic protocol, use strict epoch validation. | ||
| if (member.useClassicProtocol()) { |
There was a problem hiding this comment.
Grammar in this comment is off: consider changing "For member using the classic protocol" to "For members using the classic protocol" (or "For a member using...") for clarity.
| if (memberEpoch == member.memberEpoch()) { | ||
| return CommitPartitionValidator.NO_OP; | ||
| } | ||
| // Case 2:Client epoch > broker epoch, which is an invalid request |
There was a problem hiding this comment.
Minor comment formatting: add a space after the colon in "Case 2:Client".
| // Case 2:Client epoch > broker epoch, which is an invalid request | |
| // Case 2: Client epoch > broker epoch, which is an invalid request |
| // For member using the consumer protocol | ||
| // Case 1: Strict epoch match | ||
| if (memberEpoch == member.memberEpoch()) { | ||
| return CommitPartitionValidator.NO_OP; | ||
| } | ||
| // Case 2:Client epoch > broker epoch, which is an invalid request | ||
| if (memberEpoch > member.memberEpoch()) { | ||
| throw new StaleMemberEpochException(String.format("The received member epoch %d is larger than " | ||
| + "the expected member epoch %d.", memberEpoch, member.memberEpoch())); | ||
| } | ||
| return createAssignmentEpochValidator(member, memberEpoch); |
There was a problem hiding this comment.
The new relaxed offset-commit validation path (returning a per-partition validator when receivedMemberEpoch < broker member epoch) doesn’t appear to be covered by unit tests. It would be useful to add ConsumerGroupTest cases that (1) accept commits when assignmentEpoch <= receivedEpoch < brokerEpoch for assigned/pending-revocation partitions, and (2) reject commits when the partition isn’t assigned or when receivedEpoch < assignmentEpoch, to prevent regressions in the new behavior.
| import java.util.ArrayList; | ||
| import java.util.stream.Collectors; | ||
| import java.util.Collections; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; |
There was a problem hiding this comment.
Import order looks inconsistent with the rest of the module: java.util.stream.Collectors is currently placed between java.util.ArrayList and java.util.Collections. Other files keep java.util.* imports together and place java.util.stream.* afterwards (e.g., Utils.java:36-49). Please reorder these imports to match the established style (this may also be enforced by Checkstyle).
lucasbru
left a comment
There was a problem hiding this comment.
Thanks for the PR @lucliu1108 !
I left some comments. I think the biggest one to me is right now that we should not have two collections representing the assigned partitions in ConsumerGroupMember.
I guess this is meant as a draft, we will not be able to merge this without tests. Ideally, we merge it as a series of smaller PRs. But it can make sense to create one draft PR with everything first for the conceptual review, and then split it up into a couple smaller PRs in the final merge.
| private Map<Uuid, Set<Integer>> assignedPartitions = Map.of(); | ||
| private Map<Uuid, Set<Integer>> partitionsPendingRevocation = Map.of(); | ||
| private ConsumerGroupMemberMetadataValue.ClassicMemberMetadata classicMemberMetadata = null; | ||
| private Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs = Map.of(); |
There was a problem hiding this comment.
Why are we adding assignedPartitionsWithEpochs in addition to assignedPartitions? This will get out of sync. I think we should just have the second, and replace all uses of assignedPartitions with assignedPartitionsWithEpochs.
There was a problem hiding this comment.
I have removed all setters for assignedPartitions and partitionsPendingRevocation, but still keep the getters and as private fields to avoid redundant conversions in other methods that previously call these getters.
Since these private fields will be re-constructed every time the builder is called to set assignedPartitionsWithEpochs and partitionsRevocationsWithEpochs, the getters won't be async.
There was a problem hiding this comment.
I think this makes sense - are there so many places that require assignedPartitions and partitionsPendingRevocation that we cannot convert the calling context to use the map instead of the set? Or do we require it because of the sharing with the share group?
I wonder if it could be worth implementing a view on the keySet to avoid the second collection here, as it's probably one of the larger collections in the GC.
There was a problem hiding this comment.
For the assignedPartitions, it is part of the fields inherited from ModernGroupMember that ShareGroupMember also uses, so i left it there for the getter method.
Or another way is not to initialize it in the constructor (keep it null?) but maybe this is not good practice.
For the. partitionsPendingRevocation, there are 6 or 7 places it is directly called. Guess I could remove this method and do extra conversions for where its' called instead, or use a view.
| // We will write a member epoch of -2 for this departing static member. | ||
| // Assignment epochs are reset to 0 so when the static member rejoins, partitions | ||
| // are considered assigned from epoch 0 to the new member ID. | ||
| ConsumerGroupMember leavingStaticMember = new ConsumerGroupMember.Builder(member) |
There was a problem hiding this comment.
It seems we also need to update the dynamic member leave code. We seem to set setAssignedPartitions there.
| // If the partition is not assigned to this member, reject. | ||
| if (assignmentEpoch == null) { | ||
| throw new StaleMemberEpochException( | ||
| String.format("Partition %s-%d is not assigned to member %s.", |
There was a problem hiding this comment.
Above, we are allowing committing unassigned partitions if the "strict epoch match" passes, that is client-side member epoch = server-side member epoch.
Then this error message is probably not sufficient as a reason. We should also include that the client-side member epoch != server-side member epoch in this error.
There was a problem hiding this comment.
I'm in favor of aligning with the streams implementation as much as possible. Streams has the same pattern and uses the message Task %s-%d is not assigned or pending revocation for member.. Shall we update the streams message too (in a separate PR maybe)?
lucasbru
left a comment
There was a problem hiding this comment.
On a high level, this makes sense to me. I left a comment, but you can definitely work on the tests
| topicPartitionsWithEpochs.forEach((topicId, partitionEpochMap) -> { | ||
| List<Integer> partitionList = new ArrayList<>(partitionEpochMap.keySet()); | ||
| List<Integer> epochList = partitionList.stream() | ||
| .map(partitionId -> partitionEpochMap.getOrDefault(partitionId, 0)) |
There was a problem hiding this comment.
why getOrDefault and not just get
| private Map<Uuid, Set<Integer>> assignedPartitions = Map.of(); | ||
| private Map<Uuid, Set<Integer>> partitionsPendingRevocation = Map.of(); | ||
| private ConsumerGroupMemberMetadataValue.ClassicMemberMetadata classicMemberMetadata = null; | ||
| private Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs = Map.of(); |
There was a problem hiding this comment.
I think this makes sense - are there so many places that require assignedPartitions and partitionsPendingRevocation that we cannot convert the calling context to use the map instead of the set? Or do we require it because of the sharing with the share group?
I wonder if it could be worth implementing a view on the keySet to avoid the second collection here, as it's probably one of the larger collections in the GC.
| Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = new HashMap<>(); | ||
|
|
||
| // Get existing epochs from member | ||
| Map<Uuid, Map<Integer, Integer>> existingAssignedEpochs = member.assignedPartitionsWithEpochs(); |
There was a problem hiding this comment.
If an partition was pending revocation, and is added back to the target assignment of the member, it seems we want to keep the original assignment epoch. So we have to look at member.partitionPendingRevociations here as well, no?
Summary
This PR is an implementation of KIP-1251:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1251%3A+Assignment+epochs+for+consumer+groups
This change introduces per-partition assignment epochs to relax the
strict member epoch validation for consumer group offset commits. When
receiving an offset commit request that includes the client-side member
epoch and a member ID, we previously require checking
for a valid offset commit, which could lead to false fencing.
We now allow a relaxed offset commit check using an assignment epoch for
each assigned partition and each member,
This prevents false rejections of legitimate offset commits when a
member's epoch is bumped but the client hasn't received the update yet.
Changes
AssignmentEpochsfield toTopicPartitionsinConsumerGroupCurrentMemberAssignmentValueschemaassignedPartitionsWithEpochsandpartitionsPendingRevocationWithEpochstoConsumerGroupMemberConsumerGroup.validateOffsetCommit()with relaxedvalidation logic
createAssignmentEpochValidator()for per-partition epochchecks
GroupCoordinatorRecordHelpersto serialize assignmentepochs
GroupMetadataManagerto reset assignment epoch when leavinggroup for static members
Reviewers: @lucasbru