Skip to content

KAFKA-15615: Improve handling of fetching during metadata updates #15647

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchResponseData.LeaderIdAndEpoch;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.FetchResponse;
Expand Down Expand Up @@ -328,22 +329,35 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E
final TopicPartition tp = completedFetch.partition;
final long fetchOffset = completedFetch.nextFetchOffset();

if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.FENCED_LEADER_EPOCH ||
if (error == Errors.REPLICA_NOT_AVAILABLE) {
log.debug("Received replica not available error in fetch for partition {}", tp);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue It's just a debug log, but it's different from the previous log. Is that okay?

requestMetadataUpdate(metadata, subscriptions, tp);
} else if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.FENCED_LEADER_EPOCH) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
requestMetadataUpdate(metadata, subscriptions, tp);

LeaderIdAndEpoch currentLeader = completedFetch.partitionData.currentLeader();
if (currentLeader.leaderId() == -1 || currentLeader.leaderEpoch() == -1) {
subscriptions.awaitUpdate(tp);
}
} else if (error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.OFFSET_NOT_AVAILABLE) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
requestMetadataUpdate(metadata, subscriptions, tp);
subscriptions.awaitUpdate(tp);
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
requestMetadataUpdate(metadata, subscriptions, tp);
subscriptions.awaitUpdate(tp);
} else if (error == Errors.UNKNOWN_TOPIC_ID) {
log.warn("Received unknown topic ID error in fetch for partition {}", tp);
requestMetadataUpdate(metadata, subscriptions, tp);
subscriptions.awaitUpdate(tp);
} else if (error == Errors.INCONSISTENT_TOPIC_ID) {
log.warn("Received inconsistent topic ID error in fetch for partition {}", tp);
requestMetadataUpdate(metadata, subscriptions, tp);
subscriptions.awaitUpdate(tp);
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,10 @@ public synchronized void completeValidation(TopicPartition tp) {
assignedState(tp).completeValidation();
}

public synchronized boolean awaitingUpdate(TopicPartition tp) {
return assignedState(tp).awaitingUpdate();
}

public synchronized FetchPosition validPosition(TopicPartition tp) {
return assignedState(tp).validPosition();
}
Expand Down Expand Up @@ -761,6 +765,13 @@ public void requestOffsetReset(TopicPartition partition) {
requestOffsetReset(partition, defaultResetStrategy);
}

public synchronized void awaitUpdate(TopicPartition partition) {
final TopicPartitionState state = assignedStateOrNull(partition);
if (state != null) {
state.awaitUpdate();
}
}

public synchronized void requestOffsetResetIfPartitionAssigned(TopicPartition partition) {
final TopicPartitionState state = assignedStateOrNull(partition);
if (state != null) {
Expand Down Expand Up @@ -1015,6 +1026,12 @@ private void reset(AutoOffsetResetStrategy strategy) {
});
}

private void awaitUpdate() {
transitionState(FetchStates.AWAIT_UPDATE, () -> {
this.nextRetryTimeMs = null;
});
}

/**
* Check if the position exists and needs to be validated. If so, enter the AWAIT_VALIDATION state. This method
* also will update the position with the current leader and epoch.
Expand All @@ -1031,7 +1048,7 @@ private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp
return false;
}

if (position != null && !position.currentLeader.equals(currentLeaderAndEpoch)) {
if (position != null && (!position.currentLeader.equals(currentLeaderAndEpoch) || awaitingUpdate())) {
FetchPosition newPosition = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch);
validatePosition(newPosition);
preferredReadReplica = null;
Expand Down Expand Up @@ -1079,6 +1096,10 @@ private boolean awaitingValidation() {
return fetchState.equals(FetchStates.AWAIT_VALIDATION);
}

private boolean awaitingUpdate() {
return fetchState.equals(FetchStates.AWAIT_UPDATE);
}

private boolean awaitingRetryBackoff(long nowMs) {
return nextRetryTimeMs != null && nowMs < nextRetryTimeMs;
}
Expand Down Expand Up @@ -1238,7 +1259,8 @@ public boolean hasValidPosition() {
FETCHING() {
@Override
public Collection<FetchState> validTransitions() {
return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET, FetchStates.AWAIT_VALIDATION);
return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET,
FetchStates.AWAIT_VALIDATION, FetchStates.AWAIT_UPDATE);
}

@Override
Expand Down Expand Up @@ -1280,6 +1302,24 @@ public boolean requiresPosition() {
return true;
}

@Override
public boolean hasValidPosition() {
return false;
}
},

AWAIT_UPDATE() {
@Override
public Collection<FetchState> validTransitions() {
return Arrays.asList(FetchStates.FETCHING, FetchStates.AWAIT_RESET,
FetchStates.AWAIT_VALIDATION, FetchStates.AWAIT_UPDATE);
}

@Override
public boolean requiresPosition() {
return true;
}

@Override
public boolean hasValidPosition() {
return false;
Expand Down
Loading
Loading