Skip to content

Commit 8635169

Browse files
authored
[client] Lookup sender need retry to send when leader not found in metadata cache (#2052)
1 parent a339158 commit 8635169

File tree

2 files changed

+51
-27
lines changed

2 files changed

+51
-27
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,22 @@ private void runOnce(boolean drainAll) throws Exception {
126126
sendLookups(lookups);
127127
}
128128

129-
private void sendLookups(List<AbstractLookupQuery<?>> lookups) {
129+
private void sendLookups(List<AbstractLookupQuery<?>> lookups) throws Exception {
130130
if (lookups.isEmpty()) {
131131
return;
132132
}
133133
// group by <leader, lookup type> to lookup batches
134134
Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> lookupBatches =
135135
groupByLeaderAndType(lookups);
136+
137+
// if no lookup batches, sleep a bit to avoid busy loop. This case will happen when there is
138+
// no leader for all the lookup request in queue.
139+
if (lookupBatches.isEmpty() && !lookupQueue.hasUnDrained()) {
140+
// TODO: may use wait/notify mechanism to avoid active sleep, and use a dynamic sleep
141+
// time based on the request waited time.
142+
Thread.sleep(100);
143+
}
144+
136145
// now, send the batches
137146
lookupBatches.forEach(
138147
(destAndType, batch) -> sendLookups(destAndType.f0, destAndType.f1, batch));
@@ -148,11 +157,10 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
148157
// lookup the leader node
149158
TableBucket tb = lookup.tableBucket();
150159
try {
151-
// TODO this can be a re-triable operation. We should retry here instead of
152-
// throwing exception.
153160
leader = metadataUpdater.leaderFor(tb);
154161
} catch (Exception e) {
155-
lookup.future().completeExceptionally(e);
162+
// if leader is not found, re-enqueue the lookup to send again.
163+
reEnqueueLookup(lookup);
156164
continue;
157165
}
158166
lookupBatchesByLeader
@@ -165,24 +173,16 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
165173
@VisibleForTesting
166174
void sendLookups(
167175
int destination, LookupType lookupType, List<AbstractLookupQuery<?>> lookupBatches) {
168-
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
169-
if (gateway == null) {
170-
// TODO handle this exception, like retry.
171-
throw new LeaderNotAvailableException(
172-
"Server " + destination + " is not found in metadata cache.");
173-
}
174-
175176
if (lookupType == LookupType.LOOKUP) {
176-
sendLookupRequest(destination, gateway, lookupBatches);
177+
sendLookupRequest(destination, lookupBatches);
177178
} else if (lookupType == LookupType.PREFIX_LOOKUP) {
178-
sendPrefixLookupRequest(destination, gateway, lookupBatches);
179+
sendPrefixLookupRequest(destination, lookupBatches);
179180
} else {
180181
throw new IllegalArgumentException("Unsupported lookup type: " + lookupType);
181182
}
182183
}
183184

184-
private void sendLookupRequest(
185-
int destination, TabletServerGateway gateway, List<AbstractLookupQuery<?>> lookups) {
185+
private void sendLookupRequest(int destination, List<AbstractLookupQuery<?>> lookups) {
186186
// table id -> (bucket -> lookups)
187187
Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new HashMap<>();
188188
for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
@@ -195,6 +195,19 @@ private void sendLookupRequest(
195195
.addLookup(lookup);
196196
}
197197

198+
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
199+
if (gateway == null) {
200+
lookupByTableId.forEach(
201+
(tableId, lookupsByBucket) ->
202+
handleLookupRequestException(
203+
new LeaderNotAvailableException(
204+
"Server "
205+
+ destination
206+
+ " is not found in metadata cache."),
207+
destination,
208+
lookupsByBucket));
209+
}
210+
198211
lookupByTableId.forEach(
199212
(tableId, lookupsByBucket) ->
200213
sendLookupRequestAndHandleResponse(
@@ -206,9 +219,7 @@ private void sendLookupRequest(
206219
}
207220

208221
private void sendPrefixLookupRequest(
209-
int destination,
210-
TabletServerGateway gateway,
211-
List<AbstractLookupQuery<?>> prefixLookups) {
222+
int destination, List<AbstractLookupQuery<?>> prefixLookups) {
212223
// table id -> (bucket -> lookups)
213224
Map<Long, Map<TableBucket, PrefixLookupBatch>> lookupByTableId = new HashMap<>();
214225
for (AbstractLookupQuery<?> abstractLookupQuery : prefixLookups) {
@@ -221,6 +232,19 @@ private void sendPrefixLookupRequest(
221232
.addLookup(prefixLookup);
222233
}
223234

235+
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
236+
if (gateway == null) {
237+
lookupByTableId.forEach(
238+
(tableId, lookupsByBucket) ->
239+
handlePrefixLookupException(
240+
new LeaderNotAvailableException(
241+
"Server "
242+
+ destination
243+
+ " is not found in metadata cache."),
244+
destination,
245+
lookupsByBucket));
246+
}
247+
224248
lookupByTableId.forEach(
225249
(tableId, prefixLookupBatch) ->
226250
sendPrefixLookupRequestAndHandleResponse(
@@ -396,7 +420,6 @@ private void handlePrefixLookupException(
396420
}
397421

398422
private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
399-
lookup.incrementRetries();
400423
lookupQueue.appendLookup(lookup);
401424
}
402425

@@ -455,6 +478,7 @@ private void handleLookupError(
455478
tableBucket,
456479
maxRetries - lookup.retries(),
457480
error.formatErrMsg());
481+
lookup.incrementRetries();
458482
reEnqueueLookup(lookup);
459483
} else {
460484
LOG.warn(

fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,10 @@ void testSendLookupRequestWithNotLeaderOrFollowerException() throws Exception {
135135
assertThat(result).isNotDone();
136136
lookupQueue.appendLookup(lookupQuery);
137137

138-
// Wait for all retries to complete and verify it eventually fails
139-
assertThatThrownBy(() -> result.get(5, TimeUnit.SECONDS))
140-
.isInstanceOf(ExecutionException.class)
141-
.hasMessageContaining("Leader not found after retry");
138+
// Wait for all retries to complete and verify it eventually fails. This case will be failed
139+
// after timeout.
140+
assertThatThrownBy(() -> result.get(2, TimeUnit.SECONDS))
141+
.isInstanceOf(java.util.concurrent.TimeoutException.class);
142142

143143
// Verify that retries happened (should be 1, because server meta invalidated)
144144
assertThat(lookupQuery.retries()).isEqualTo(1);
@@ -173,10 +173,10 @@ void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() throws Except
173173
assertThat(future).isNotDone();
174174
lookupQueue.appendLookup(prefixLookupQuery);
175175

176-
// Wait for all retries to complete and verify it eventually fails
177-
assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
178-
.isInstanceOf(ExecutionException.class)
179-
.hasMessageContaining("Leader not found after retry");
176+
// Wait for all retries to complete and verify it eventually fails. This case will be failed
177+
// after timeout.
178+
assertThatThrownBy(() -> future.get(2, TimeUnit.SECONDS))
179+
.isInstanceOf(java.util.concurrent.TimeoutException.class);
180180

181181
// Verify that retries happened (should be 1, because server meta invalidated)
182182
assertThat(prefixLookupQuery.retries()).isEqualTo(1);

0 commit comments

Comments
 (0)