Skip to content

KAFKA-18687: Setting the subscriptionMetadata during conversion to consumer group #19790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 27, 2025

Conversation

dongnuo123
Copy link
Contributor

@dongnuo123 dongnuo123 commented May 22, 2025

When a consumer protocol static member replaces an existing member in a
classic group, it's not necessary to recompute the assignment. However,
it happens anyway.

In
ConsumerGroup.fromClassicGroup,
we don't set the group's subscriptionMetadata. Later in the consumer
group heartbeat, we call
updateSubscriptionMetadata
,
which notices that the group's subscriptionMetadata needs an
update

and bumps the epoch. Since the epoch is bumped, we recompute the
assignment
.

As a fix, this patch sets the subscriptionMetadata in
ConsumerGroup.fromClassicGroup.

Reviewers: Sean Quah [email protected], David Jacot
[email protected]

@github-actions github-actions bot added triage PRs from the community small Small PRs labels May 22, 2025
@dajac dajac added KIP-848 The Next Generation of the Consumer Rebalance Protocol and removed triage PRs from the community labels May 23, 2025
@github-actions github-actions bot added the core Kafka Broker label May 23, 2025
Copy link
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix! Looks good!

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongnuo123 Thanks for the patch. Overall, it looks good to me. I left a few small comments.

@@ -1576,6 +1581,7 @@ public void testFromClassicGroup() {
assertEquals(expectedConsumerGroup.groupEpoch(), consumerGroup.groupEpoch());
assertEquals(expectedConsumerGroup.state(), consumerGroup.state());
assertEquals(expectedConsumerGroup.preferredServerAssignor(), consumerGroup.preferredServerAssignor());
assertEquals(Map.copyOf(expectedConsumerGroup.subscriptionMetadata()), Map.copyOf(consumerGroup.subscriptionMetadata()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Map.copyOf really necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without it assertEquals just compares the reference.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I got it. The issue is that TimelineHashMap does not implement toString. Hence when the comparison fails, you get a cryptic error:

Expected :org.apache.kafka.timeline.TimelineHashMap@17c13
Actual   :org.apache.kafka.timeline.TimelineHashMap@308d9

However, it still uses .equals to compare the objects. I am fine with keeping the copy to make it better here.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. the commit must be cherry-picked to 4.0 too.

@dajac dajac merged commit fcb722d into apache:trunk May 27, 2025
27 checks passed
@dajac dajac deleted the KAFKA-18687 branch May 27, 2025 09:26
dajac pushed a commit that referenced this pull request May 27, 2025
…nsumer group (#19790)

When a consumer protocol static member replaces an existing member in a
classic group, it's not necessary to recompute the assignment. However,
it happens anyway.

In

[ConsumerGroup.fromClassicGroup](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java#L1140),
we don't set the group's subscriptionMetadata.  Later in the consumer
group heartbeat, we [call

updateSubscriptionMetadata](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1748),
which [notices that the group's subscriptionMetadata needs an

update](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L2757)
and bumps the epoch. Since the epoch is bumped, we [recompute the

assignment](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1766).

As a fix, this patch sets the subscriptionMetadata in
ConsumerGroup.fromClassicGroup.

Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
@dajac
Copy link
Member

dajac commented May 27, 2025

Merged it to trunk and 4.0.

showuon pushed a commit to showuon/kafka that referenced this pull request May 29, 2025
…nsumer group (apache#19790)

When a consumer protocol static member replaces an existing member in a
classic group, it's not necessary to recompute the assignment. However,
it happens anyway.

In

[ConsumerGroup.fromClassicGroup](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java#L1140),
we don't set the group's subscriptionMetadata.  Later in the consumer
group heartbeat, we [call

updateSubscriptionMetadata](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1748),
which [notices that the group's subscriptionMetadata needs an

update](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L2757)
and bumps the epoch. Since the epoch is bumped, we [recompute the

assignment](https://github.com/apache/kafka/blob/0ff4dafb7de4e24ddb7961d52e50e728f2eee4eb/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java#L1766).

As a fix, this patch sets the subscriptionMetadata in
ConsumerGroup.fromClassicGroup.

Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-848 The Next Generation of the Consumer Rebalance Protocol small Small PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants