Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class OffsetFetcher {
private final SubscriptionState subscriptions;
private final ConsumerNetworkClient client;
private final Time time;
private final long retryBackoffMs;
private final int requestTimeoutMs;
private final IsolationLevel isolationLevel;
private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
Expand All @@ -87,6 +88,7 @@ public OffsetFetcher(LogContext logContext,
this.client = client;
this.metadata = metadata;
this.subscriptions = subscriptions;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
this.isolationLevel = isolationLevel;
this.apiVersions = apiVersions;
Expand Down Expand Up @@ -247,8 +249,13 @@ private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition
private void resetPositionsAsync(Map<TopicPartition, AutoOffsetResetStrategy> partitionAutoOffsetResetStrategyMap) {
Map<TopicPartition, Long> partitionResetTimestamps = partitionAutoOffsetResetStrategyMap.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().timestamp().get()));
Set<TopicPartition> partitionsToRetry = new HashSet<>();
Map<Node, Map<TopicPartition, ListOffsetsPartition>> timestampsToSearchByNode =
groupListOffsetRequests(partitionResetTimestamps, new HashSet<>());
groupListOffsetRequests(partitionResetTimestamps, partitionsToRetry);
if (!partitionsToRetry.isEmpty()) {
subscriptions.setNextAllowedRetry(partitionsToRetry, time.milliseconds() + retryBackoffMs);
metadata.requestUpdate(false);
}
for (Map.Entry<Node, Map<TopicPartition, ListOffsetsPartition>> entry : timestampsToSearchByNode.entrySet()) {
Node node = entry.getKey();
final Map<TopicPartition, ListOffsetsPartition> resetTimestamps = entry.getValue();
Expand Down Expand Up @@ -413,7 +420,7 @@ private Map<Node, Map<TopicPartition, ListOffsetsPartition>> groupListOffsetRequ
}
}
}
return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap);
return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap, partitionsToRetry);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,29 @@ OffsetFetcherUtils.ListOffsetResult handleListOffsetResponse(ListOffsetsResponse
return new OffsetFetcherUtils.ListOffsetResult(fetchedOffsets, partitionsToRetry);
}

<T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(Map<TopicPartition, T> partitionMap) {
return partitionMap.entrySet()
.stream()
.collect(Collectors.groupingBy(entry -> metadata.fetch().leaderFor(entry.getKey()),
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
<T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(
Map<TopicPartition, T> partitionMap,
Set<TopicPartition> partitionsToRetry) {
Map<Node, Map<TopicPartition, T>> partitionsByNode = new HashMap<>();

final var cluster = metadata.fetch();

for (Map.Entry<TopicPartition, T> entry : partitionMap.entrySet()) {
TopicPartition tp = entry.getKey();
Node leader = cluster.leaderFor(tp);

if (leader == null) {
log.debug("Leader for partition {} is unknown while regrouping partition map by node", tp);
partitionsToRetry.add(tp);
continue;
}

partitionsByNode
.computeIfAbsent(leader, __ -> new HashMap<>())
.put(tp, entry.getValue());
}

return partitionsByNode;
}

Map<TopicPartition, SubscriptionState.FetchPosition> refreshAndGetPartitionsToValidate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,16 @@ private Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartitio
.setCurrentLeaderEpoch(currentLeaderEpoch));
}
}
return offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap);
Set<TopicPartition> partitionsSkippedInRegroup = new HashSet<>();
Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> result =
offsetFetcherUtils.regroupPartitionMapByNode(partitionDataMap, partitionsSkippedInRegroup);
if (!partitionsSkippedInRegroup.isEmpty()) {
metadata.requestUpdate(false);
listOffsetsRequestState.ifPresent(state ->
partitionsSkippedInRegroup.forEach(tp ->
state.remainingToSearch.put(tp, timestampsToSearch.get(tp))));
}
return result;
}

// Visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

public class OffsetFetcherTest {
Expand Down Expand Up @@ -351,6 +352,63 @@ public void testresetPositionsMetadataRefresh() {
assertEquals(5, subscriptions.position(tp0).offset);
}

/**
* Test for KAFKA-20312: when regroupPartitionMapByNode sees a partition with null leader
* (e.g. leader was known when building partitionDataMap but metadata changed before regroup),
* we should not throw NPE; partition is skipped and retried after backoff.
*/
@Test
public void testResetPositionsMetadataRefreshWhenLeaderBecomesUnknownDuringRegroup() {
buildFetcher();
assignFromUser(singleton(tp0));
subscriptions.requestOffsetReset(tp0, AutoOffsetResetStrategy.LATEST);

// Cluster with no partition info so leaderFor(tp0) returns null during regroup
Cluster clusterWithNoLeader = new Cluster(
"mockClusterId",
Collections.singletonList(new Node(0, "localhost", 9092)),
Collections.emptyList(),
Collections.emptySet(),
Collections.emptySet());

// First fetch() (during regroup) returns cluster with no leader; subsequent calls use real metadata
ConsumerMetadata metadataSpy = spy(metadata);
when(metadataSpy.fetch()).thenReturn(clusterWithNoLeader).thenAnswer(invocation -> invocation.callRealMethod());

LogContext logContext = new LogContext();
offsetFetcher = new OffsetFetcher(logContext,
consumerClient,
metadataSpy,
subscriptions,
time,
retryBackoffMs,
requestTimeoutMs,
IsolationLevel.READ_UNCOMMITTED,
apiVersions);

offsetFetcher.resetPositionsIfNeeded();
consumerClient.pollNoWakeup();

// Should not crash; partition still needs reset (skipped in regroup)
assertTrue(subscriptions.isOffsetResetNeeded(tp0));
assertFalse(subscriptions.hasValidPosition(tp0));

// Metadata refresh, then retry after backoff with valid metadata and successful response
client.prepareMetadataUpdate(initialUpdateResponse);
consumerClient.pollNoWakeup();

time.sleep(retryBackoffMs);
client.prepareResponse(listOffsetRequestMatcher(ListOffsetsRequest.LATEST_TIMESTAMP, validLeaderEpoch),
listOffsetResponse(Errors.NONE, 1L, 5L));

offsetFetcher.resetPositionsIfNeeded();
consumerClient.pollNoWakeup();

assertFalse(subscriptions.isOffsetResetNeeded(tp0));
assertTrue(subscriptions.isFetchable(tp0));
assertEquals(5L, subscriptions.position(tp0).offset);
}

@Test
public void testListOffsetNoUpdateMissingEpoch() {
buildFetcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,65 @@ public void testListOffsetsWaitingForMetadataUpdate_RetrySucceeds() throws Execu
verifySuccessfulPollAndResponseReceived(fetchOffsetsFuture, expectedOffsets);
}

/**
* Test for KAFKA-20312: when regroupPartitionMapByNode sees a partition with null leader
* (e.g. metadata race), it should skip that partition and add it to remainingToSearch instead
* of throwing NullPointerException.
*/
@Test
public void testFetchOffsetsRegroupSkipsNullLeaderPartition_NoNPE() throws ExecutionException,
InterruptedException {
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(TEST_PARTITION_1, ListOffsetsRequest.EARLIEST_TIMESTAMP);
timestampsToSearch.put(TEST_PARTITION_2, ListOffsetsRequest.EARLIEST_TIMESTAMP);

// currentLeader returns a leader for both partitions (so both enter partitionDataMap)
when(metadata.currentLeader(TEST_PARTITION_1)).thenReturn(testLeaderEpoch(LEADER_1, Optional.empty()));
when(metadata.currentLeader(TEST_PARTITION_2)).thenReturn(testLeaderEpoch(LEADER_2, Optional.empty()));
when(subscriptionState.isAssigned(any(TopicPartition.class))).thenReturn(true);

// metadata.fetch() returns a cluster where PARTITION_2 has null leader (e.g. race: leader lost)
List<PartitionInfo> partitions = new ArrayList<>();
partitions.add(new PartitionInfo(TEST_TOPIC, 1, LEADER_1, null, null));
partitions.add(new PartitionInfo(TEST_TOPIC, 2, null, null, null));
Cluster clusterWithNullLeader = new Cluster("clusterId", Collections.singletonList(LEADER_1),
partitions, Collections.emptySet(), Collections.emptySet());
when(metadata.fetch()).thenReturn(clusterWithNullLeader);

// Should not throw NPE; only PARTITION_1 has a leader in regroup, so one request for LEADER_1
CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsetsFuture =
requestManager.fetchOffsets(timestampsToSearch, false);
assertEquals(1, requestManager.requestsToSend());
// requestsToRetry is populated when the in-flight request completes and remainingToSearch is non-empty, not yet
assertEquals(0, requestManager.requestsToRetry());

// Complete request for PARTITION_1
NetworkClientDelegate.PollResult res = requestManager.poll(time.milliseconds());
assertEquals(1, res.unsentRequests.size());
NetworkClientDelegate.UnsentRequest unsentRequest = res.unsentRequests.get(0);
ClientResponse clientResponse = buildClientResponse(unsentRequest,
Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1, Optional.empty())));
clientResponse.onComplete();
assertFalse(fetchOffsetsFuture.isDone());

// Metadata update: now both partitions have leaders; retry should send request for PARTITION_2
mockSuccessfulRequest(Map.of(TEST_PARTITION_1, LEADER_1, TEST_PARTITION_2, LEADER_2));
requestManager.onUpdate(new ClusterResource(""));
assertEquals(1, requestManager.requestsToSend());

// Complete the retry request (only PARTITION_2 in this batch)
NetworkClientDelegate.PollResult retryPoll = requestManager.poll(time.milliseconds());
assertEquals(1, retryPoll.unsentRequests.size());
ClientResponse retryResponse = buildClientResponse(retryPoll.unsentRequests.get(0),
Collections.singletonMap(TEST_PARTITION_2, new OffsetAndTimestampInternal(10L, -1, Optional.empty())));
retryResponse.onComplete();

Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = new HashMap<>();
expectedOffsets.put(TEST_PARTITION_1, new OffsetAndTimestampInternal(5L, -1, Optional.empty()));
expectedOffsets.put(TEST_PARTITION_2, new OffsetAndTimestampInternal(10L, -1, Optional.empty()));
verifyRequestSuccessfullyCompleted(fetchOffsetsFuture, expectedOffsets);
}

@ParameterizedTest
@MethodSource("retriableErrors")
public void testRequestFailsWithRetriableError_RetrySucceeds(Errors error) throws ExecutionException, InterruptedException {
Expand Down
Loading