-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-15615: Improve handling of fetching during metadata updates #15647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
thanks for the PR! |
@johnnychhsu This patch is designed to make consumer wait without sending fetch while metadata is being updated. |
@kirktrue, PTAL, thanks in advance. |
@appchemist—thanks for the PR, and sorry for the delay in response! I've taken a first pass but am still working through the unit test changes. |
@kirktrue Thanks for the heads-up! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, @appchemist!
Just a few comments/questions.
error == Errors.REPLICA_NOT_AVAILABLE || | ||
error == Errors.FENCED_LEADER_EPOCH) { | ||
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); | ||
requestMetadataUpdate(metadata, subscriptions, tp); | ||
} else if (error == Errors.REPLICA_NOT_AVAILABLE || | ||
error == Errors.KAFKA_STORAGE_ERROR || | ||
error == Errors.FENCED_LEADER_EPOCH || | ||
error == Errors.OFFSET_NOT_AVAILABLE) { | ||
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); | ||
requestMetadataUpdate(metadata, subscriptions, tp); | ||
subscriptions.awaitUpdate(tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, if the replica is not available, we will flag the partition as awaiting a metadata update. Is this a key part of this change? Why don't we want the first if
block to also await an update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you!
I missed that.
For this case, there is no need to await a metadata update.
I think simply initializing the PreferredReadReplica should be enough.
Since FetchUtils.requestMetadataUpdate()
is already being called, it should also be initializing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
} else { | ||
requestMetadataUpdate(metadata, subscriptions, partition); | ||
subscriptions.awaitUpdate(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With this change, we first request a metadata update, then flag our partition as awaiting the metadata update whenever we encounter a NOT_LEADER_OR_FOLLOWER
or FENCED_LEADER_EPOCH
. However, in the FetchCollector.handleInitializeErrors()
method, we only only request the metadata update, but don't flag the partition. Is that seeming inconsistency intentional?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the FetchStates is FETCHING
as per KIP-951, the FetchCollector.handleInitializeErrors()
method is called.
I thought that in this case, it should not be changed to AWAIT_UPDATE
.
Additionally, if it's AWAIT_UPDATE
, it will be filtered out by the following code inside the FetchCollector.initialize()
method and will not go through FetchCollector.handleInitializeErrors()
.
if (!subscriptions.hasValidPosition(tp)) {
// this can happen when a rebalance happened while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp);
return null;
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As another alternative, it could change the status to AWAIT_UPDATE
in FetchCollector.handleInitializeErrors()
only when it doesn't include leader info
Upon further thought, it seems possible to differentiate based on the following conditions.
completedFetch.partitionData.currentLeader().leaderId() != -1 && completedFetch.partitionData.currentLeader().leaderEpoch() != -1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
Outdated
Show resolved
Hide resolved
Thanks for review! @kirktrue |
- add 'AWAIT_UPDATE' state in FetchStates
- AWAIT_UPDATE transitions to the next state if the Leader information in the updated metadata is the same as the existing one.
…als/SubscriptionState.java LGTM Co-authored-by: Kirk True <[email protected]>
- Just requestMetadataUpdate in REPLICA_NOT_AVAILABLE error
- Move await a metadata update logic from AbstractFetch to FetchCollector
- Add test cases about REPLICA_NOT_AVAILABLE error - Fix some comments
- Simplify if statements
@lianetm & @philipnee—would you be able to review this PR? Thanks! |
@lianetm & @philipnee If you have a moment, please take a look |
error == Errors.KAFKA_STORAGE_ERROR || | ||
error == Errors.FENCED_LEADER_EPOCH || | ||
if (error == Errors.REPLICA_NOT_AVAILABLE) { | ||
log.debug("Received replica not available error in fetch for partition {}", tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kirktrue It's just a debug log, but it's different from the previous log. Is that okay?
} else { | ||
requestMetadataUpdate(metadata, subscriptions, partition); | ||
subscriptions.awaitUpdate(partition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
error == Errors.REPLICA_NOT_AVAILABLE || | ||
error == Errors.FENCED_LEADER_EPOCH) { | ||
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); | ||
requestMetadataUpdate(metadata, subscriptions, tp); | ||
} else if (error == Errors.REPLICA_NOT_AVAILABLE || | ||
error == Errors.KAFKA_STORAGE_ERROR || | ||
error == Errors.FENCED_LEADER_EPOCH || | ||
error == Errors.OFFSET_NOT_AVAILABLE) { | ||
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); | ||
requestMetadataUpdate(metadata, subscriptions, tp); | ||
subscriptions.awaitUpdate(tp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
Hi @kirktrue |
Problem
As stated in the KAFKA-15615
When a fetch response receives an error about partition leadership, fencing, etc. a metadata refresh is triggered.
Until its metadata is updated, the consumer will continue to attempt fetching data for the partition, resulting in failure responses due to outdated information.
Approach
In cases where a partition requires metadata updates due to Fetch response, a temporary "Pause" approach is applied.
AWAIT_UPDATE
(ADD) state.AWAIT_UPDATE
state is an Unfetchable state (TopicPartitionState.isFetchable()
is false).AbstractFetch.fetchablePartitions
.AWAIT_UPDATE
FETCHING
FETCHING
AWAIT_RESET
AWAIT_VALIDATION
AWAIT_UPDATE
AWAIT_UPDATE
state, the. partition transitions to the next state bySubscriptionState.maybeValidatePositionForCurrentLeader
.FETCHING
AWAIT_VALIDATION
requiresPosition
truehasValidPosition
falseCommitter Checklist (excluded from commit message)