Skip to content

Commit 99894b0

Browse files
committed
KAFKA-15615: Improve handling of fetching during metadata updates
- Move await a metadata update logic from AbstractFetch to FetchCollector
1 parent 8b0b3f8 commit 99894b0

File tree

2 files changed

+4
-3
lines changed

2 files changed

+4
-3
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,6 @@ protected void handleFetchSuccess(final Node fetchTarget,
200200
if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) {
201201
partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch(
202202
Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch())));
203-
} else {
204-
requestMetadataUpdate(metadata, subscriptions, partition);
205-
subscriptions.awaitUpdate(partition);
206203
}
207204
}
208205

clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,10 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E
333333
error == Errors.FENCED_LEADER_EPOCH) {
334334
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
335335
requestMetadataUpdate(metadata, subscriptions, tp);
336+
if (completedFetch.partitionData.currentLeader().leaderId() == -1 ||
337+
completedFetch.partitionData.currentLeader().leaderEpoch() == -1) {
338+
subscriptions.awaitUpdate(tp);
339+
}
336340
} else if (error == Errors.KAFKA_STORAGE_ERROR ||
337341
error == Errors.OFFSET_NOT_AVAILABLE) {
338342
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());

0 commit comments

Comments
 (0)