Skip to content

KAFKA-17747: [1/N] Add MetadataHash field to Consumer/Share/StreamGroupMetadataValue #19504

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,21 @@ public static CoordinatorRecord newConsumerGroupSubscriptionMetadataTombstoneRec
*
* @param groupId The consumer group id.
* @param newGroupEpoch The consumer group epoch.
* @param metadataHash The consumer group metadata hash.
* @return The record.
*/
public static CoordinatorRecord newConsumerGroupEpochRecord(
String groupId,
int newGroupEpoch
int newGroupEpoch,
long metadataHash
) {
return CoordinatorRecord.record(
new ConsumerGroupMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ConsumerGroupMetadataValue()
.setEpoch(newGroupEpoch),
.setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
(short) 0
)
);
Expand Down Expand Up @@ -647,18 +650,21 @@ public static CoordinatorRecord newShareGroupSubscriptionMetadataTombstoneRecord
*
* @param groupId The group id.
* @param newGroupEpoch The group epoch.
* @param metadataHash The group metadata hash.
* @return The record.
*/
public static CoordinatorRecord newShareGroupEpochRecord(
String groupId,
int newGroupEpoch
int newGroupEpoch,
long metadataHash
) {
return CoordinatorRecord.record(
new ShareGroupMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
new ShareGroupMetadataValue()
.setEpoch(newGroupEpoch),
.setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
(short) 0
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2180,7 +2180,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
int groupEpoch = group.groupEpoch();
if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch));
records.add(newStreamsGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {}.", groupId, memberId, groupEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
Expand Down Expand Up @@ -2808,7 +2808,7 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In

if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newShareGroupEpochRecord(groupId, groupEpoch));
records.add(newShareGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
}

Expand Down Expand Up @@ -3516,7 +3516,7 @@ private CoordinatorResult<Void, CoordinatorRecord> handleRegularExpressionsResul

if (bumpGroupEpoch) {
int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(
Expand Down Expand Up @@ -3832,7 +3832,7 @@ private UpdateSubscriptionMetadataResult updateSubscriptionMetadata(

if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch));
records.add(newConsumerGroupEpochRecord(groupId, groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
metrics.record(CONSUMER_GROUP_REBALANCES_SENSOR_NAME);
}
Expand Down Expand Up @@ -4231,7 +4231,7 @@ private <T> CoordinatorResult<T, CoordinatorRecord> consumerGroupFenceMembers(

// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch));
records.add(newConsumerGroupEpochRecord(group.groupId(), groupEpoch, 0));
log.info("[GroupId {}] Bumped group epoch to {}.", group.groupId(), groupEpoch);

for (ConsumerGroupMember member : members) {
Expand Down Expand Up @@ -4275,7 +4275,7 @@ private <T> CoordinatorResult<T, CoordinatorRecord> shareGroupFenceMember(

// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch));
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, 0));

cancelGroupSessionTimeout(group.groupId(), member.memberId());

Expand Down Expand Up @@ -4338,7 +4338,7 @@ private <T> CoordinatorResult<T, CoordinatorRecord> streamsGroupFenceMember(

// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch));
records.add(newStreamsGroupEpochRecord(group.groupId(), groupEpoch, 0));

cancelTimers(group.groupId(), member.memberId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ public void createConsumerGroupRecords(
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember))
);

records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch()));
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), 0));

members().forEach((consumerGroupMemberId, consumerGroupMember) ->
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ public static CoordinatorRecord newStreamsGroupPartitionMetadataTombstoneRecord(

public static CoordinatorRecord newStreamsGroupEpochRecord(
String groupId,
int newGroupEpoch
int newGroupEpoch,
long metadataHash
) {
Objects.requireNonNull(groupId, "groupId should not be null here");

Expand All @@ -161,7 +162,8 @@ public static CoordinatorRecord newStreamsGroupEpochRecord(
.setGroupId(groupId),
new ApiMessageAndVersion(
new StreamsGroupMetadataValue()
.setEpoch(newGroupEpoch),
.setEpoch(newGroupEpoch)
.setMetadataHash(metadataHash),
(short) 0
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." }
"about": "The group epoch." },
// The MetadataHash is added in 4.1 (KIP-1101). It's used to track
// subscribed topics in the group. When subscribed topics change,
// like partition count or rack change, the hash will be different.
// It indicates that the group should be rebalanced.
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"default": 0, "taggedVersions": "0+", "tag": 0,
"about": "The hash of all topics in the group." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
{
"apiKey": 4,
"type": "coordinator-key",
// This message is replaced by ConsumerGroupMetadataValue#MetadataHash
// in 4.1 (KIP-1101).
"name": "ConsumerGroupPartitionMetadataKey",
"validVersions": "0",
"flexibleVersions": "none",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
{
"apiKey": 4,
"type": "coordinator-value",
// This message is replaced by ConsumerGroupMetadataValue#MetadataHash
// in 4.1 (KIP-1101).
"name": "ConsumerGroupPartitionMetadataValue",
"validVersions": "0",
"flexibleVersions": "0+",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "type": "int32", "versions": "0+",
"about": "The group epoch." }
"about": "The group epoch." },
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
"flexibleVersions": "0+",
"fields": [
{ "name": "Epoch", "versions": "0+", "type": "int32",
"about": "The group epoch." }
"about": "The group epoch." },
{ "name": "MetadataHash", "versions": "0+", "type": "int64",
"about": "The hash of all topics in the group." }
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.modern.MemberState;
Expand Down Expand Up @@ -84,6 +85,7 @@
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupEpochTombstoneRecord;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -251,13 +253,15 @@ public void testNewConsumerGroupEpochRecord() {
.setGroupId("group-id"),
new ApiMessageAndVersion(
new ConsumerGroupMetadataValue()
.setEpoch(10),
.setEpoch(10)
.setMetadataHash(10),
(short) 0
)
);

assertEquals(expectedRecord, newConsumerGroupEpochRecord(
"group-id",
10,
10
));
}
Expand Down Expand Up @@ -855,6 +859,26 @@ public void testNewConsumerGroupRegularExpressionTombstone() {
assertEquals(expectedRecord, record);
}

@Test
public void testNewShareGroupEpochRecord() {
CoordinatorRecord expectedRecord = CoordinatorRecord.record(
new ShareGroupMetadataKey()
.setGroupId("group-id"),
new ApiMessageAndVersion(
new ShareGroupMetadataValue()
.setEpoch(10)
.setMetadataHash(10),
(short) 0
)
);

assertEquals(expectedRecord, newShareGroupEpochRecord(
"group-id",
10,
10
));
}

/**
* Creates a map of partitions to racks for testing.
*
Expand Down
Loading