Skip to content

Commit e6b10f1

Browse files
committed
[client] Lookup sender need retry to send when leader not found in metadata cache
1 parent c5257fd commit e6b10f1

File tree

1 file changed

+18
-6
lines changed

1 file changed

+18
-6
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,13 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
148148
// lookup the leader node
149149
TableBucket tb = lookup.tableBucket();
150150
try {
151-
// TODO this can be a re-triable operation. We should retry here instead of
152-
// throwing exception.
153151
leader = metadataUpdater.leaderFor(tb);
154152
} catch (Exception e) {
155-
lookup.future().completeExceptionally(e);
153+
if (lookup.retries() < maxRetries && !lookup.future().isDone()) {
154+
reEnqueueLookup(lookup);
155+
} else {
156+
lookup.future().completeExceptionally(e);
157+
}
156158
continue;
157159
}
158160
lookupBatchesByLeader
@@ -167,9 +169,19 @@ void sendLookups(
167169
int destination, LookupType lookupType, List<AbstractLookupQuery<?>> lookupBatches) {
168170
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
169171
if (gateway == null) {
170-
// TODO handle this exception, like retry.
171-
throw new LeaderNotAvailableException(
172-
"Server " + destination + " is not found in metadata cache.");
172+
lookupBatches.forEach(
173+
lookup -> {
174+
if (lookup.retries() < maxRetries && !lookup.future().isDone()) {
175+
reEnqueueLookup(lookup);
176+
} else {
177+
lookup.future()
178+
.completeExceptionally(
179+
new LeaderNotAvailableException(
180+
"Server "
181+
+ destination
182+
+ " is not found in metadata cache."));
183+
}
184+
});
173185
}
174186

175187
if (lookupType == LookupType.LOOKUP) {

0 commit comments

Comments
 (0)