KAFKA-17397: Ensure ClassicKafkaConsumer sends leave request on close even if interrupted #21332
+33
−3
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
This PR addresses KAFKA-17397 by ensuring deterministic behavior in
ClassicKafkaConsumer.close()under interruption for the classic groupprotocol.
In CI,
PlaintextConsumerTest.testCloseLeavesGroupOnInterrupt()canfail for
ClassicKafkaConsumerbecauseLeaveGroupRequestis sometimesblocked by
NetworkClient.isReady()when a metadata update is due.Since isReady prioritizes metadata requests, the pending
LeaveGroupRequestcan remain unsent. When the calling thread isalready interrupted,
ConsumerNetworkClientmay throw anInterruptExceptionbefore the pending request gets a chance to besent. This causes the member to leave only via
session.timeout.ms,resulting in test flakiness.
Changes
Allow
LeaveGroupRequestto bypass themetadata-updategating inNetworkClient.isReady()(while still respectingcanSendRequest),ensuring the request is sent during
close()even when a metadataupdate is due.
Sequence Diagram
the
ClassicKafkaConsumerfrom successfully sending aLEAVE_GROUPrequest when it is interrupted.
LEAVE_GROUPrequest deterministicby bypassing the metadata update step specifically for
LEAVE_GROUP.(See, Step 9, 15, 16, 17, 18)
Fixes
https://develocity.apache.org/scans/tests?search.rootProjectNames=kafka&search.timeZoneId=Asia%2FTaipei&tests.container=kafka.api.PlaintextConsumerTest&tests.sortField=FLAKY&tests.test=testCloseLeavesGroupOnInterrupt(String)%5B1%5D
In local re-produce
build failures due to busy CPU and full memory.)
Note on the implementation
I considered adding
isReadyForLeaveGroupdirectly to theKafkaClientinterface to keep the design consistent. However, I opted for the
current approach to avoid modifying the interface, as I wasn't sure if
changing the
KafkaClientinterface would require a KIP.If you believe adding it to the interface is preferable (and acceptable
without or with a KIP ), please let me know. I'm happy to refactor it
and wrote KIP.