Skip to content

KAFKA-18031: Flaky PlaintextConsumerTest testCloseLeavesGroupOnInterrupt #19105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from

Conversation

TaiJuWu
Copy link
Collaborator

@TaiJuWu TaiJuWu commented Mar 5, 2025

Jira: https://issues.apache.org/jira/browse/KAFKA-18031

This test expose an issue:
If the request number exceeds maxInFlightRequestsPerConnection, the LeaveGroup request would be not sent in time when closing.
This behavior can be observed in the log (I have attach it in Jira), search kkk you can see only five request on log and LeaveGroup is not included .

I do follow change:

  1. Allow LeaveGroup request to send even if the InFlightRequest meet the max number.
  2. Move MaybeThrowInterrupt to the end of ConsumerNetwork.poll
  3. For test, honor the close time instead of default values, the default value of close is 30000ms but session timeout is 45000 (in this test, we only wait 22500ms within adminClient.describeConsumerGroups ).

@github-actions github-actions bot added triage PRs from the community core Kafka Broker consumer clients small Small PRs labels Mar 5, 2025
@TaiJuWu
Copy link
Collaborator Author

TaiJuWu commented Mar 5, 2025

@lianetm Please take a look when you are available, thanks.

@chia7712
Copy link
Member

chia7712 commented Mar 9, 2025

If the request number exceeds maxInFlightRequestsPerConnection, the LeaveGroup request would be not sent in time when closing.

why does it exceed the maxInFlightRequestsPerConnection? Does it happen in the test only?

@TaiJuWu
Copy link
Collaborator Author

TaiJuWu commented Mar 9, 2025

If the request number exceeds maxInFlightRequestsPerConnection, the LeaveGroup request would be not sent in time when closing.

why does it exceed the maxInFlightRequestsPerConnection? Does it happen in the test only?

Base on log, following requests are sent only:

[2025-01-14 11:01:57,082] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1af677f8, destination=-1, correlationId=0, clientId=ConsumerTestConsumer, createdTimeMs=1736852517076, requestBuilder=FindCoordinatorRequestData(key='my-test', keyType=0, coordinatorKeys=[])) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

[2025-01-14 11:01:57,100] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@28cb86b2, destination=2147483646, correlationId=5, clientId=ConsumerTestConsumer, createdTimeMs=1736852517096, requestBuilder=JoinGroupRequestData(groupId='my-test', sessionTimeoutMs=45000, rebalanceTimeoutMs=6000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 4, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])], reason='')) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

[2025-01-14 11:01:57,109] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@19705650, destination=2147483646, correlationId=7, clientId=ConsumerTestConsumer, createdTimeMs=1736852517109, requestBuilder=JoinGroupRequestData(groupId='my-test', sessionTimeoutMs=45000, rebalanceTimeoutMs=6000, memberId='ConsumerTestConsumer-2034ef08-671b-4849-a8f4-fffa51ab3d28', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 4, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])], reason='need to re-join with the given member-id: ConsumerTestConsumer-2034ef08-671b-4849-a8f4-fffa51ab3d28')) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

[2025-01-14 11:01:57,145] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1a76202b, destination=2147483646, correlationId=8, clientId=ConsumerTestConsumer, createdTimeMs=1736852517144, requestBuilder=SyncGroupRequestData(groupId='my-test', generationId=1, memberId='ConsumerTestConsumer-2034ef08-671b-4849-a8f4-fffa51ab3d28', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='ConsumerTestConsumer-2034ef08-671b-4849-a8f4-fffa51ab3d28', assignment=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, -1, -1, -1, -1])])) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

[2025-01-14 11:01:57,168] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@74b1838, destination=2147483646, correlationId=9, clientId=ConsumerTestConsumer, createdTimeMs=1736852517166, requestBuilder=OffsetFetchRequestData(groupId='', topics=[], groups=[OffsetFetchRequestGroup(groupId='my-test', memberId=null, memberEpoch=-1, topics=[OffsetFetchRequestTopics(name='topic', partitionIndexes=[0, 1])])], requireStable=true)) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

Does it happen in the test only?

If I didn't miss anything, I did not see other integration test related consumer#close.

@chia7712
Copy link
Member

chia7712 commented Mar 9, 2025

@TaiJuWu thanks for your sharing. Do you know why consumer sends join-group and sync-group request before receiving response of FindCoordinatorRequestData? for another, is it flaky on classic consumer only?

@TaiJuWu
Copy link
Collaborator Author

TaiJuWu commented Mar 10, 2025

@TaiJuWu thanks for your sharing. Do you know why consumer sends join-group and sync-group request before receiving response of FindCoordinatorRequestData?

@chia7712 Thanks for your feedback, I found there is response for FindCoordinator first so I think maybe maxInFlightRequestsPerConnection is not an issue at the moment.

I am not sure why the number of requests is always 5 on my local if the test fail.

[2025-01-14 11:01:57,082] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1af677f8, destination=-1, correlationId=0, clientId=ConsumerTestConsumer, createdTimeMs=1736852517076, requestBuilder=FindCoordinatorRequestData(key='my-test', keyType=0, coordinatorKeys=[])) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

// Received Coordinator response
[2025-01-14 11:01:57,092] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] Received FindCoordinator response ClientResponse(receivedTimeMs=1736852517090, latencyMs=14, disconnected=false, timedOut=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=6, clientId=ConsumerTestConsumer, correlationId=0, headerVersion=2), responseBody=FindCoordinatorResponseData(throttleTimeMs=0, errorCode=0, errorMessage='', nodeId=0, host='', port=0, coordinators=[Coordinator(key='my-test', nodeId=1, host='localhost', port=37721, errorCode=0, errorMessage='')])) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:918)


[2025-01-14 11:01:57,100] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@28cb86b2, destination=2147483646, correlationId=5, clientId=ConsumerTestConsumer, createdTimeMs=1736852517096, requestBuilder=JoinGroupRequestData(groupId='my-test', sessionTimeoutMs=45000, rebalanceTimeoutMs=6000, memberId='', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 4, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])], reason='')) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

[2025-01-14 11:01:57,109] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@19705650, destination=2147483646, correlationId=7, clientId=ConsumerTestConsumer, createdTimeMs=1736852517109, requestBuilder=JoinGroupRequestData(groupId='my-test', sessionTimeoutMs=45000, rebalanceTimeoutMs=6000, memberId='ConsumerTestConsumer-2034ef08-671b-4849-a8f4-fffa51ab3d28', groupInstanceId=null, protocolType='consumer', protocols=[JoinGroupRequestProtocol(name='range', metadata=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1]), JoinGroupRequestProtocol(name='cooperative-sticky', metadata=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 4, -1, -1, -1, -1, 0, 0, 0, 0, -1, -1, -1, -1, -1, -1])], reason='need to re-join with the given member-id: ConsumerTestConsumer-2034ef08-671b-4849-a8f4-fffa51ab3d28')) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

[2025-01-14 11:01:57,145] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1a76202b, destination=2147483646, correlationId=8, clientId=ConsumerTestConsumer, createdTimeMs=1736852517144, requestBuilder=SyncGroupRequestData(groupId='my-test', generationId=1, memberId='ConsumerTestConsumer-2034ef08-671b-4849-a8f4-fffa51ab3d28', groupInstanceId=null, protocolType='consumer', protocolName='range', assignments=[SyncGroupRequestAssignment(memberId='ConsumerTestConsumer-2034ef08-671b-4849-a8f4-fffa51ab3d28', assignment=[0, 3, 0, 0, 0, 1, 0, 5, 116, 111, 112, 105, 99, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 1, -1, -1, -1, -1])])) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

[2025-01-14 11:01:57,168] DEBUG [Consumer clientId=ConsumerTestConsumer, groupId=my-test] kkk ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@74b1838, destination=2147483646, correlationId=9, clientId=ConsumerTestConsumer, createdTimeMs=1736852517166, requestBuilder=OffsetFetchRequestData(groupId='', topics=[], groups=[OffsetFetchRequestGroup(groupId='my-test', memberId=null, memberEpoch=-1, topics=[OffsetFetchRequestTopics(name='topic', partitionIndexes=[0, 1])])], requireStable=true)) (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient:516)

for another, is it flaky on classic consumer only?

From Develocity, it happened on classic only.

Copy link

A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants