Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchResponseData.LeaderIdAndEpoch;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchResponse;
Expand Down Expand Up @@ -326,22 +327,35 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();

if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.FENCED_LEADER_EPOCH ||
if (error == Errors.REPLICA_NOT_AVAILABLE) {
log.debug("Received replica not available error in fetch for partition {}", tp);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue It's just a debug log, but it's different from the previous log. Is that okay?

requestMetadataUpdate(metadata, subscriptions, tp);
} else if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.FENCED_LEADER_EPOCH) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
requestMetadataUpdate(metadata, subscriptions, tp);

LeaderIdAndEpoch currentLeader = completedFetch.partitionData.currentLeader();
if (currentLeader.leaderId() == -1 || currentLeader.leaderEpoch() == -1) {
subscriptions.awaitUpdate(tp);
}
} else if (error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.OFFSET_NOT_AVAILABLE) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
requestMetadataUpdate(metadata, subscriptions, tp);
subscriptions.awaitUpdate(tp);
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
requestMetadataUpdate(metadata, subscriptions, tp);
subscriptions.awaitUpdate(tp);
} else if (error == Errors.UNKNOWN_TOPIC_ID) {
log.warn("Received unknown topic ID error in fetch for partition {}", tp);
requestMetadataUpdate(metadata, subscriptions, tp);
subscriptions.awaitUpdate(tp);
} else if (error == Errors.INCONSISTENT_TOPIC_ID) {
log.warn("Received inconsistent topic ID error in fetch for partition {}", tp);
requestMetadataUpdate(metadata, subscriptions, tp);
subscriptions.awaitUpdate(tp);
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,10 @@ public synchronized void completeValidation(TopicPartition tp) {
assignedState(tp).completeValidation();
}

public synchronized boolean awaitingUpdate(TopicPartition tp) {
return assignedState(tp).awaitingUpdate();
}

public synchronized FetchPosition validPosition(TopicPartition tp) {
return assignedState(tp).validPosition();
}
Expand Down Expand Up @@ -710,6 +714,13 @@ public void requestOffsetReset(TopicPartition partition) {
requestOffsetReset(partition, defaultResetStrategy);
}

public synchronized void awaitUpdate(TopicPartition partition) {
final TopicPartitionState state = assignedStateOrNull(partition);
if (state != null) {
state.awaitUpdate();
}
}

public synchronized void requestOffsetResetIfPartitionAssigned(TopicPartition partition) {
final TopicPartitionState state = assignedStateOrNull(partition);
if (state != null) {
Expand Down Expand Up @@ -964,6 +975,12 @@ private void reset(OffsetResetStrategy strategy) {
});
}

private void awaitUpdate() {
transitionState(FetchStates.AWAIT_UPDATE, () -> {
this.nextRetryTimeMs = null;
});
}

/**
* Check if the position exists and needs to be validated. If so, enter the AWAIT_VALIDATION state. This method
* also will update the position with the current leader and epoch.
Expand All @@ -980,7 +997,7 @@ private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp
return false;
}

if (position != null && !position.currentLeader.equals(currentLeaderAndEpoch)) {
if (position != null && (!position.currentLeader.equals(currentLeaderAndEpoch) || awaitingUpdate())) {
FetchPosition newPosition = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch);
validatePosition(newPosition);
preferredReadReplica = null;
Expand Down Expand Up @@ -1028,6 +1045,10 @@ private boolean awaitingValidation() {
return fetchState.equals(FetchStates.AWAIT_VALIDATION);
}

private boolean awaitingUpdate() {
return fetchState.equals(FetchStates.AWAIT_UPDATE);
}

private boolean awaitingRetryBackoff(long nowMs) {
return nextRetryTimeMs != null && nowMs < nextRetryTimeMs;
}
Expand Down Expand Up @@ -1186,7 +1207,8 @@ public boolean hasValidPosition() {
FETCHING() {
@Override
public Collection<FetchState> validTransitions() {
return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET, FetchStates.AWAIT_VALIDATION);
return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET,
FetchStates.AWAIT_VALIDATION, FetchStates.AWAIT_UPDATE);
}

@Override
Expand Down Expand Up @@ -1228,6 +1250,24 @@ public boolean requiresPosition() {
return true;
}

@Override
public boolean hasValidPosition() {
return false;
}
},

AWAIT_UPDATE() {
@Override
public Collection<FetchState> validTransitions() {
return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET,
FetchStates.AWAIT_VALIDATION, FetchStates.AWAIT_UPDATE);
}

@Override
public boolean requiresPosition() {
return true;
}

@Override
public boolean hasValidPosition() {
return false;
Expand Down
Loading