KAFKA-20312: Handle null leader during OffsetFetcher regroup safely#21760
Open
nileshkumar3 wants to merge 1 commit intoapache:trunkfrom
Open
KAFKA-20312: Handle null leader during OffsetFetcher regroup safely#21760nileshkumar3 wants to merge 1 commit intoapache:trunkfrom
nileshkumar3 wants to merge 1 commit intoapache:trunkfrom
Conversation
…own during regroup
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description:
This PR fixes a potential NullPointerException in OffsetFetcherUtils.regroupPartitionMapByNode when regrouping partitions by leader during offset reset / list-offsets.
Background
Partitions are grouped by leader via metadata.fetch().leaderFor(tp). If metadata changes between the initial leader lookup and the regroup step (e.g. leadership change or stale metadata), leaderFor(tp) can return null. The previous implementation used Collectors.groupingBy(..., leaderFor(...)), which throws an NPE when the classifier returns null.
Fix
OffsetFetcherUtils.regroupPartitionMapByNode
Replaced the stream-based grouping with a loop that skips partitions whose leader is null, adds them to a caller-provided partitionsToRetry set, and does not trigger metadata refresh (callers are responsible for retry and metadata).
Callers
OffsetFetcher (classic consumer): passes partitionsToRetry into the helper; in resetPositionsAsync, when the set is non-empty, calls setNextAllowedRetry(partitionsToRetry, now + retryBackoffMs) and metadata.requestUpdate(false).
OffsetsRequestManager (new consumer): passes a local retry set into the helper, then adds skipped partitions to state.remainingToSearch (with timestamp) and calls metadata.requestUpdate(false) when the set is non-empty.
This keeps existing retry semantics and avoids the NPE.
Tests
OffsetFetcherTest.testResetPositionsMetadataRefreshWhenLeaderBecomesUnknownDuringRegroup
Simulates leaderFor(tp) returning null during regroup (first metadata.fetch() stubbed to a cluster with no partition, then real method). Asserts no exception, partition stays pending reset, and after backoff and a second attempt with valid metadata the offset reset succeeds.
OffsetsRequestManagerTest.testFetchOffsetsRegroupSkipsNullLeaderPartition_NoNPE
Simulates the same scenario in the fetch-offsets path: currentLeader has a leader but metadata.fetch() returns a cluster where one partition has no leader. Asserts no NPE, one request sent (for the partition with a leader), and that the skipped partition is retried after metadata update and completes successfully.