-
Notifications
You must be signed in to change notification settings - Fork 14.3k
KAFKA-15556: Remove NetworkClientDelegate methods isUnavailable, maybeThrowAuthFailure, and tryConnect #15020
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
@philipnee, I closed the other PR since closing and opening it again doesn't trigger the CI to start. Here is the context for this PR: #14406 (comment) |
@philipnee, all builds succeed, test failures looks unrelated. PTAL, thanks. |
@philipnee @kirktrue, Please take a look if you guys have some free time. Thanks in advance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @Phuc-Hong-Tran - thanks for taking time raising this PR. I apologize for not writing the the jira ticket well here, so I recommend to rethink how you would approach this refactor. In short - we don't need to handle isUnavailable and auth failure in the request manager, so it is ok to make the default implementation to just return false, and perform no ops for both methods. The reason is networkClientDelegate already handle these scenario, just in a slight different order to the LegacyKafkaConsumer.
@@ -372,7 +372,7 @@ Node selectReadReplica(final TopicPartition partition, final Node leaderReplica, | |||
} | |||
} | |||
|
|||
protected Map<Node, FetchSessionHandler.FetchRequestData> prepareCloseFetchSessionRequests() { | |||
protected Map<Node, FetchSessionHandler.FetchRequestData> prepareCloseFetchSessionRequests(boolean checkNodeAvailability) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final boolean checkNodeAvailability
@@ -385,7 +385,9 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareCloseFetchSessi | |||
// skip sending the close request. | |||
final Node fetchTarget = cluster.nodeById(fetchTargetNodeId); | |||
|
|||
if (fetchTarget == null || isUnavailable(fetchTarget)) { | |||
boolean fetchTargetAvailability = checkNodeAvailability ? (fetchTarget == null || isUnavailable(fetchTarget)) : fetchTarget == null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final boolean
- i would also call this
isFetchTargetAvailable
- let's not use inline if. this makes the code harder to read.
log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", partition, node); | ||
} else if (nodesWithPendingFetchRequests.contains(node.id())) { | ||
log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", partition, node); | ||
if (checkNodeAvailability) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to avoid nested if?
return pollInternal( | ||
prepareFetchRequests(), | ||
prepareFetchRequests(checkNodeAvailability), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just pass false. the var above is useless.
return pollInternal( | ||
prepareCloseFetchSessionRequests(), | ||
prepareCloseFetchSessionRequests(checkNodeAvailability), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@@ -102,7 +102,9 @@ public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> | |||
* @return number of fetches sent | |||
*/ | |||
public synchronized int sendFetches() { | |||
final Map<Node, FetchSessionHandler.FetchRequestData> fetchRequests = prepareFetchRequests(); | |||
boolean checkNodeAvailability = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
@@ -781,10 +781,10 @@ public void testFetchSkipsBlackedOutNodes() { | |||
Node node = initialUpdateResponse.brokers().iterator().next(); | |||
|
|||
client.backoff(node, 500); | |||
assertEquals(0, sendFetches()); | |||
assertEquals(1, sendFetches()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you changing the tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're not checking the node availablity using the networkClientDelegate in FetchRequestManager, the request that was made would pass through the check and made it way to unsentRequest. After the backoff the request should be sent already.
Should I not change the test and make new one instead?
@@ -403,7 +405,7 @@ protected Map<Node, FetchSessionHandler.FetchRequestData> prepareCloseFetchSessi | |||
* Create fetch requests for all nodes for which we have assigned partitions | |||
* that have no existing requests in flight. | |||
*/ | |||
protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() { | |||
protected Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests(boolean checkNodeAvailability) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final boolean
@philipnee, Thanks for the comments. When you said I should rethink about the approach, are you suggesting that I should start over from the idea phase? |
hi @Phuc-Hong-Tran - yes, the abstractFetch implementation is based on the LegacyKafkaConsumer and therefore requires connection probing. We don't need that in the AsyncKafkaConsumer as it is being done right before sending out the requests. |
I understand, will get another PR out ASAP. |
…rowAuthFailure to perform no ops
@philipnee, PTAL, thanks in advance. |
@philipnee, is there anything else that I should change? |
hi @Phuc-Hong-Tran - thanks for the PR. Could you also clean up the isUnavailable and maybeThrowAuthFailure methods in the networkClientDelegate. I believe they aren't being used anywhere. |
@philipnee Will do |
@philipnee All done |
@Phuc-Hong-Tran—is this PR ready for review? |
@kirktrue, it's ready for review |
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
* Node's availability will be checked at a later stage, so we default to return false. | ||
* @param node {@link Node} to check for availability | ||
* @return | ||
*/ | ||
@Override | ||
protected boolean isUnavailable(Node node) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does seem a bit like half a refactor. Would it be possible to remove isUnavailable
and maybeThrowAuthFailure
entirely from AbstractFetch
?
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
Change:
*Refactored
AbstractFetch
so onlyFetcher
will check for node status when creating fetch requests.Committer Checklist (excluded from commit message)