Skip to content

KAFKA-17456: Make sure FindCoordinatorResponse get created before consumer #17404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -480,10 +480,10 @@ public String deserialize(String topic, Headers headers, ByteBuffer data) {
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 1));
prepareRebalance(client, node, assignor, singletonList(tp), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor,
true, groupId, groupInstanceId, Optional.of(deserializer), false);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp), null);
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
client.prepareResponseFrom(fetchResponse(tp, 0, recordCount), node);
return consumer;
Expand Down Expand Up @@ -818,10 +818,10 @@ public void verifyHeartbeatSent(GroupProtocol groupProtocol) throws Exception {
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);

consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);

// initial fetch
client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node);
Expand Down Expand Up @@ -851,9 +851,9 @@ public void verifyHeartbeatSentWhenFetchedDataReady(GroupProtocol groupProtocol)
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);

consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);
Expand Down Expand Up @@ -933,17 +933,17 @@ public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit(GroupPr
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

// 1st coordinator error should cause coordinator unknown
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, groupId, node), node);
// 2nd coordinator error should find the correct coordinator and clear the findCoordinatorFuture
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);

// create a consumer with groupID with manual assignment
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.assign(singleton(tp0));

// 1st coordinator error should cause coordinator unknown
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, groupId, node), node);
consumer.poll(Duration.ofMillis(0));

// 2nd coordinator error should find the correct coordinator and clear the findCoordinatorFuture
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);

client.prepareResponse(offsetResponse(Collections.singletonMap(tp0, 50L), Errors.NONE));
client.prepareResponse(fetchResponse(tp0, 50L, 5));

Expand Down Expand Up @@ -1257,9 +1257,9 @@ public void testAutoCommitSentBeforePositionUpdate(GroupProtocol groupProtocol)
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);

consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);
Expand Down Expand Up @@ -1296,8 +1296,8 @@ public void testRegexSubscription(GroupProtocol groupProtocol) {
initMetadata(client, partitionCounts);
Node node = metadata.fetch().nodes().get(0);

consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);

consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer));

Expand Down Expand Up @@ -1357,9 +1357,9 @@ public void testWakeupWithFetchDataAvailable(GroupProtocol groupProtocol) throws
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

prepareRebalance(client, node, assignor, singletonList(tp0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);

consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);
Expand Down Expand Up @@ -1398,9 +1398,9 @@ public void testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol groupPro
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

prepareRebalance(client, node, assignor, singletonList(tp0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);

consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);
Expand All @@ -1425,11 +1425,10 @@ public void fetchResponseWithUnexpectedPartitionIsIgnored(GroupProtocol groupPro
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

prepareRebalance(client, node, assignor, singletonList(tp0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer));

prepareRebalance(client, node, assignor, singletonList(tp0), null);

Map<TopicPartition, FetchInfo> fetches1 = new HashMap<>();
fetches1.put(tp0, new FetchInfo(0, 1));
fetches1.put(t2p0, new FetchInfo(0, 10)); // not assigned and not fetched
Expand Down Expand Up @@ -1468,6 +1467,8 @@ public void testSubscriptionChangesWithAutoCommitEnabled(GroupProtocol groupProt

ConsumerPartitionAssignor assignor = new RangeAssignor();

// mock rebalance responses
Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);

// initial subscription
Expand All @@ -1478,8 +1479,6 @@ public void testSubscriptionChangesWithAutoCommitEnabled(GroupProtocol groupProt
assertTrue(consumer.subscription().contains(topic) && consumer.subscription().contains(topic2));
assertTrue(consumer.assignment().isEmpty());

// mock rebalance responses
Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);

consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);
Expand Down Expand Up @@ -1585,13 +1584,13 @@ public void testSubscriptionChangesWithAutoCommitDisabled(GroupProtocol groupPro

ConsumerPartitionAssignor assignor = new RangeAssignor();

// mock rebalance responses
prepareRebalance(client, node, assignor, singletonList(tp0), null);

consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);

initializeSubscriptionWithSingleTopic(consumer, getConsumerRebalanceListener(consumer));

// mock rebalance responses
prepareRebalance(client, node, assignor, singletonList(tp0), null);

consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
consumer.poll(Duration.ZERO);

Expand Down Expand Up @@ -1638,12 +1637,11 @@ public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration(Gro
Node node = metadata.fetch().nodes().get(0);

CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
prepareRebalance(client, node, assignor, singletonList(tp0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);

initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());

prepareRebalance(client, node, assignor, singletonList(tp0), null);

RuntimeException assignmentException = assertThrows(RuntimeException.class,
() -> consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)));
assertEquals(partitionAssigned + singleTopicPartition, assignmentException.getCause().getMessage());
Expand All @@ -1664,10 +1662,10 @@ public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration(GroupProt
Node node = metadata.fetch().nodes().get(0);

CooperativeStickyAssignor assignor = new CooperativeStickyAssignor();
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);

initializeSubscriptionWithSingleTopic(consumer, getExceptionConsumerRebalanceListener());
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);

RuntimeException assignException = assertThrows(RuntimeException.class,
() -> consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE)));
Expand Down Expand Up @@ -2029,9 +2027,9 @@ public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed(GroupProtocol gro
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, groupInstanceId);
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());


Expand Down Expand Up @@ -2099,9 +2097,9 @@ private void consumerCloseTest(GroupProtocol groupProtocol,
initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);

Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);
final KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, false, Optional.empty());
consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null);

client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWithIds(1, Collections.singletonMap(topic, 1), topicIds));

Expand Down Expand Up @@ -2300,12 +2298,12 @@ public void testMeasureCommitSyncDuration(GroupProtocol groupProtocol) {
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);

KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));

client.prepareResponseFrom(
FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
client.prepareResponseFrom(
offsetCommitResponse(Collections.singletonMap(tp0, Errors.NONE)),
Expand Down Expand Up @@ -2346,13 +2344,13 @@ public void testMeasureCommittedDuration(GroupProtocol groupProtocol) {
MockClient client = new MockClient(time, metadata);
initMetadata(client, Collections.singletonMap(topic, 2));
Node node = metadata.fetch().nodes().get(0);
// lookup coordinator
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);

KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata,
assignor, true, groupInstanceId);
consumer.assign(singletonList(tp0));

// lookup coordinator
client.prepareResponseFrom(
FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());

// fetch offset for one topic
Expand All @@ -2376,13 +2374,13 @@ public void testRebalanceException(GroupProtocol groupProtocol) {

initMetadata(client, Collections.singletonMap(topic, 1));
Node node = metadata.fetch().nodes().get(0);
client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);

KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);

consumer.subscribe(singleton(topic), getExceptionConsumerRebalanceListener());
Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());

client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node);
client.prepareResponseFrom(joinGroupFollowerResponse(assignor, 1, memberId, leaderId, Errors.NONE), coordinator);
client.prepareResponseFrom(syncGroupResponse(singletonList(tp0), Errors.NONE), coordinator);

Expand Down Expand Up @@ -2412,15 +2410,15 @@ public void testReturnRecordsDuringRebalance(GroupProtocol groupProtocol) throws
ConsumerMetadata metadata = createMetadata(subscription);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looped testReturnRecordsDuringRebalance and see the following error:

Gradle Test Run :clients:test > Gradle Test Executor 253 > KafkaConsumerTest > testReturnRecordsDuringRebalance(GroupProtocol) > "testReturnRecordsDuringRebalance(GroupProtocol).groupProtocol=CLASSIC" FAILED
    org.opentest4j.AssertionFailedError: expected: <11> but was: <0>
        at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
        at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
        at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
        at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:150)
        at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:145)
        at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:531)
        at app//org.apache.kafka.clients.consumer.KafkaConsumerTest.testReturnRecordsDuringRebalance(KafkaConsumerTest.java:2440)

@frankvicky could you please check it?

MockClient client = new MockClient(time, metadata);
ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor();
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);

initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));

consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));

Node node = metadata.fetch().nodes().get(0);
Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);

KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(Arrays.asList(topic, topic2), getConsumerRebalanceListener(consumer));

// a poll with non-zero milliseconds would complete three round-trips (discover, join, sync)
TestUtils.waitForCondition(() -> {
consumer.poll(Duration.ofMillis(100L));
Expand Down Expand Up @@ -2556,6 +2554,7 @@ public void testGetGroupMetadata(GroupProtocol groupProtocol) {
initMetadata(client, Collections.singletonMap(topic, 1));
final Node node = metadata.fetch().nodes().get(0);

prepareRebalance(client, node, assignor, singletonList(tp0), null);
final KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);

final ConsumerGroupMetadata groupMetadataOnStart = consumer.groupMetadata();
Expand All @@ -2565,7 +2564,6 @@ public void testGetGroupMetadata(GroupProtocol groupProtocol) {
assertEquals(groupInstanceId, groupMetadataOnStart.groupInstanceId());

consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer));
prepareRebalance(client, node, assignor, singletonList(tp0), null);

// initial fetch
client.prepareResponseFrom(fetchResponse(tp0, 0, 0), node);
Expand Down Expand Up @@ -3287,13 +3285,13 @@ public void testEnforceRebalanceTriggersRebalanceOnNextPoll(GroupProtocol groupP
Time time = new MockTime(1L);
ConsumerMetadata metadata = createMetadata(subscription);
MockClient client = new MockClient(time, metadata);
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
MockRebalanceListener countingRebalanceListener = new MockRebalanceListener();
initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1)));

consumer.subscribe(Arrays.asList(topic, topic2), countingRebalanceListener);
Node node = metadata.fetch().nodes().get(0);
prepareRebalance(client, node, assignor, Arrays.asList(tp0, t2p0), null);
KafkaConsumer<String, String> consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
consumer.subscribe(Arrays.asList(topic, topic2), countingRebalanceListener);

// a first rebalance to get the assignment, we need two poll calls since we need two round trips to finish join / sync-group
consumer.poll(Duration.ZERO);
Expand Down
Loading