Skip to content

KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group #19790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2485,8 +2485,6 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
// Test offset deletion while consuming
val offsetDeleteResult = client.deleteConsumerGroupOffsets(testGroupId, util.Set.of(tp1, tp2))

// Top level error will equal to the first partition level error
assertFutureThrows(classOf[GroupSubscribedToTopicException], offsetDeleteResult.all())
Comment on lines -2488 to -2489
Copy link
Contributor Author

@dongnuo123 dongnuo123 May 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it because offsetDeleteResult.all() throws the first exception it sees in the result topicPartition map but the sequence of the entries is not deterministic

test_topic-0 --> GroupSubscribedToTopicException
foo-0        --> UnknownTopicOrPartitionException

assertFutureThrows(classOf[GroupSubscribedToTopicException], offsetDeleteResult.partitionResult(tp1))
assertFutureThrows(classOf[UnknownTopicOrPartitionException], offsetDeleteResult.partitionResult(tp2))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,8 @@ ConsumerGroup convertToConsumerGroup(ClassicGroup classicGroup, List<Coordinator
snapshotRegistry,
metrics,
classicGroup,
metadataImage.topics()
metadataImage.topics(),
metadataImage.cluster()
);
} catch (SchemaException e) {
log.warn("Cannot upgrade classic group " + classicGroup.groupId() +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.kafka.coordinator.group.modern.ModernGroup;
import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.image.ClusterImage;
import org.apache.kafka.image.TopicsImage;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
Expand Down Expand Up @@ -1129,7 +1130,8 @@ public ConsumerGroupDescribeResponseData.DescribedGroup asDescribedGroup(
* @param snapshotRegistry The SnapshotRegistry.
* @param metrics The GroupCoordinatorMetricsShard.
* @param classicGroup The converted classic group.
* @param topicsImage The TopicsImage for topic id and topic name conversion.
* @param topicsImage The current metadata for all available topics.
* @param clusterImage The current metadata for the Kafka cluster.
* @return The created ConsumerGroup.
*
* @throws SchemaException if any member's subscription or assignment cannot be deserialized.
Expand All @@ -1139,7 +1141,8 @@ public static ConsumerGroup fromClassicGroup(
SnapshotRegistry snapshotRegistry,
GroupCoordinatorMetricsShard metrics,
ClassicGroup classicGroup,
TopicsImage topicsImage
TopicsImage topicsImage,
ClusterImage clusterImage
) {
String groupId = classicGroup.groupId();
ConsumerGroup consumerGroup = new ConsumerGroup(snapshotRegistry, groupId, metrics);
Expand Down Expand Up @@ -1195,6 +1198,12 @@ public static ConsumerGroup fromClassicGroup(
consumerGroup.updateMember(newMember);
});

consumerGroup.setSubscriptionMetadata(consumerGroup.computeSubscriptionMetadata(
consumerGroup.subscribedTopicNames(),
topicsImage,
clusterImage
));

return consumerGroup;
}

Expand All @@ -1210,6 +1219,8 @@ public void createConsumerGroupRecords(
records.add(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId(), consumerGroupMember))
);

records.add(GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId(), subscriptionMetadata()));

records.add(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId(), groupEpoch(), 0));

members().forEach((consumerGroupMemberId, consumerGroupMember) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10237,6 +10237,10 @@ memberId2, new MemberAssignmentImpl(mkAssignment(

// Create the new consumer group with member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
Expand All @@ -10245,12 +10249,6 @@ memberId2, new MemberAssignmentImpl(mkAssignment(
// Member 2 joins the new consumer group.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),

// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),

// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, assignor.targetPartitions(memberId2)),
Expand Down Expand Up @@ -10452,6 +10450,11 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),

GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),

GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
Expand All @@ -10464,12 +10467,6 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
// Member 3 joins the new consumer group.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3),

// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),

// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
Expand Down Expand Up @@ -10657,7 +10654,7 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
);

group.transitionTo(PREPARING_REBALANCE);
group.transitionTo(COMPLETING_REBALANCE);
group.initNextGeneration();
group.transitionTo(STABLE);

context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(group, assignments));
Expand All @@ -10679,8 +10676,8 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {

ConsumerGroupMember expectedClassicMember = new ConsumerGroupMember.Builder(memberId)
.setInstanceId(instanceId)
.setMemberEpoch(0)
.setPreviousMemberEpoch(0)
.setMemberEpoch(group.generationId())
.setPreviousMemberEpoch(group.generationId())
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setSubscribedTopicNames(List.of(fooTopicName))
Expand Down Expand Up @@ -10716,7 +10713,7 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
.build();

ConsumerGroupMember expectedFinalConsumerMember = new ConsumerGroupMember.Builder(expectedReplacingConsumerMember)
.setMemberEpoch(1)
.setMemberEpoch(group.generationId())
.setServerAssignorName(NoOpPartitionAssignor.NAME)
.setRebalanceTimeoutMs(5000)
.setClassicMemberMetadata(null)
Expand All @@ -10728,9 +10725,10 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {

// Create the new consumer group with the static member.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedClassicMember),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 0, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, group.generationId(), 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, expectedClassicMember.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, group.generationId()),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedClassicMember),

// Remove the static member because the rejoining member replaces it.
Expand All @@ -10743,17 +10741,10 @@ public void testConsumerGroupHeartbeatToClassicGroupFromExistingStaticMember() {
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedReplacingConsumerMember),

// The static member rejoins the new consumer group.
// The static member rejoins the new consumer group with the same instance id and
// takes the assignment of the previous member. No new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedFinalConsumerMember),

// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1))),

// Newly joining static member bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, newMemberId, mkAssignment(mkTopicAssignment(fooTopicId, 0))),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),

// The newly created static member takes the assignment from the existing member.
// Bump its member epoch and transition to STABLE.
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, expectedFinalConsumerMember)
Expand Down Expand Up @@ -10854,6 +10845,10 @@ public void testConsumerGroupHeartbeatToClassicGroupWithEmptyAssignmentMember()

// Create the new consumer group with member 1.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 1),
Expand All @@ -10862,12 +10857,6 @@ public void testConsumerGroupHeartbeatToClassicGroupWithEmptyAssignmentMember()
// Member 2 joins the new consumer group.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),

// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),

// Newly joining member 2 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, Map.of()),
Expand Down Expand Up @@ -11239,6 +11228,11 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember1),
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember2),

GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),

GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 1, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, expectedMember1.assignedPartitions()),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, expectedMember2.assignedPartitions()),
Expand All @@ -11251,12 +11245,6 @@ memberId3, new MemberAssignmentImpl(mkAssignment(
// Member 3 joins the new consumer group.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, expectedMember3),

// The subscription metadata hasn't been updated during the conversion, so a new one is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId, Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 2),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
)),

// Newly joining member 3 bumps the group epoch. A new target assignment is computed.
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 2, 0),
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, assignor.targetPartitions(memberId1)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,8 @@ public void testFromClassicGroup() {
new SnapshotRegistry(logContext),
mock(GroupCoordinatorMetricsShard.class),
classicGroup,
metadataImage.topics()
metadataImage.topics(),
metadataImage.cluster()
);

ConsumerGroup expectedConsumerGroup = new ConsumerGroup(
Expand All @@ -1545,6 +1546,10 @@ public void testFromClassicGroup() {
expectedConsumerGroup.updateTargetAssignment(memberId, new Assignment(mkAssignment(
mkTopicAssignment(fooTopicId, 0)
)));
expectedConsumerGroup.setSubscriptionMetadata(Map.of(
fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 1),
barTopicName, new TopicMetadata(barTopicId, barTopicName, 1)
));
expectedConsumerGroup.updateMember(new ConsumerGroupMember.Builder(memberId)
.setMemberEpoch(classicGroup.generationId())
.setState(MemberState.STABLE)
Expand Down Expand Up @@ -1576,6 +1581,7 @@ public void testFromClassicGroup() {
assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch());
assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor());
assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), Map.copyOf(consumerGroup.subscriptionMetadata()));
assertEquals(expectedConsumerGroup.members(), consumerGroup.members());
}

Expand Down