Skip to content

Commit 0195e86

Browse files
committed
KAFKA-15615: Improve handling of fetching during metadata updates
- Simplify if statements
1 parent f569d09 commit 0195e86

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.common.errors.RecordTooLargeException;
2525
import org.apache.kafka.common.errors.TopicAuthorizationException;
2626
import org.apache.kafka.common.message.FetchResponseData;
27+
import org.apache.kafka.common.message.FetchResponseData.LeaderIdAndEpoch;
2728
import org.apache.kafka.common.protocol.Errors;
2829
import org.apache.kafka.common.record.RecordBatch;
2930
import org.apache.kafka.common.requests.FetchResponse;
@@ -333,8 +334,9 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E
333334
error == Errors.FENCED_LEADER_EPOCH) {
334335
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
335336
requestMetadataUpdate(metadata, subscriptions, tp);
336-
if (completedFetch.partitionData.currentLeader().leaderId() == -1 ||
337-
completedFetch.partitionData.currentLeader().leaderEpoch() == -1) {
337+
338+
LeaderIdAndEpoch currentLeader = completedFetch.partitionData.currentLeader();
339+
if (currentLeader.leaderId() == -1 || currentLeader.leaderEpoch() == -1) {
338340
subscriptions.awaitUpdate(tp);
339341
}
340342
} else if (error == Errors.KAFKA_STORAGE_ERROR ||

0 commit comments

Comments
 (0)