Skip to content

Commit 3a69c10

Browse files
swuferhongloserwang1024
authored andcommitted
[client] Lookup sender need retry to send when leader not found in metadata cache
1 parent a475aee commit 3a69c10

File tree

2 files changed

+50
-27
lines changed

2 files changed

+50
-27
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,21 @@ private void runOnce(boolean drainAll) throws Exception {
126126
sendLookups(lookups);
127127
}
128128

129-
private void sendLookups(List<AbstractLookupQuery<?>> lookups) {
129+
private void sendLookups(List<AbstractLookupQuery<?>> lookups) throws Exception {
130130
if (lookups.isEmpty()) {
131131
return;
132132
}
133133
// group by <leader, lookup type> to lookup batches
134134
Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> lookupBatches =
135135
groupByLeaderAndType(lookups);
136+
137+
// if no lookup batches, sleep a bit to avoid busy loop. This case will happen when there is
138+
// no leader for all the lookup request in queue.
139+
if (lookupBatches.isEmpty()) {
140+
// TODO Hard code sleep time.
141+
Thread.sleep(100);
142+
}
143+
136144
// now, send the batches
137145
lookupBatches.forEach(
138146
(destAndType, batch) -> sendLookups(destAndType.f0, destAndType.f1, batch));
@@ -148,11 +156,10 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
148156
// lookup the leader node
149157
TableBucket tb = lookup.tableBucket();
150158
try {
151-
// TODO this can be a re-triable operation. We should retry here instead of
152-
// throwing exception.
153159
leader = metadataUpdater.leaderFor(tb);
154160
} catch (Exception e) {
155-
lookup.future().completeExceptionally(e);
161+
// if leader is not found, re-enqueue the lookup to send again.
162+
reEnqueueLookup(lookup);
156163
continue;
157164
}
158165
lookupBatchesByLeader
@@ -165,24 +172,16 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
165172
@VisibleForTesting
166173
void sendLookups(
167174
int destination, LookupType lookupType, List<AbstractLookupQuery<?>> lookupBatches) {
168-
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
169-
if (gateway == null) {
170-
// TODO handle this exception, like retry.
171-
throw new LeaderNotAvailableException(
172-
"Server " + destination + " is not found in metadata cache.");
173-
}
174-
175175
if (lookupType == LookupType.LOOKUP) {
176-
sendLookupRequest(destination, gateway, lookupBatches);
176+
sendLookupRequest(destination, lookupBatches);
177177
} else if (lookupType == LookupType.PREFIX_LOOKUP) {
178-
sendPrefixLookupRequest(destination, gateway, lookupBatches);
178+
sendPrefixLookupRequest(destination, lookupBatches);
179179
} else {
180180
throw new IllegalArgumentException("Unsupported lookup type: " + lookupType);
181181
}
182182
}
183183

184-
private void sendLookupRequest(
185-
int destination, TabletServerGateway gateway, List<AbstractLookupQuery<?>> lookups) {
184+
private void sendLookupRequest(int destination, List<AbstractLookupQuery<?>> lookups) {
186185
// table id -> (bucket -> lookups)
187186
Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new HashMap<>();
188187
for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
@@ -195,6 +194,19 @@ private void sendLookupRequest(
195194
.addLookup(lookup);
196195
}
197196

197+
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
198+
if (gateway == null) {
199+
lookupByTableId.forEach(
200+
(tableId, lookupsByBucket) ->
201+
handleLookupRequestException(
202+
new LeaderNotAvailableException(
203+
"Server "
204+
+ destination
205+
+ " is not found in metadata cache."),
206+
destination,
207+
lookupsByBucket));
208+
}
209+
198210
lookupByTableId.forEach(
199211
(tableId, lookupsByBucket) ->
200212
sendLookupRequestAndHandleResponse(
@@ -206,9 +218,7 @@ private void sendLookupRequest(
206218
}
207219

208220
private void sendPrefixLookupRequest(
209-
int destination,
210-
TabletServerGateway gateway,
211-
List<AbstractLookupQuery<?>> prefixLookups) {
221+
int destination, List<AbstractLookupQuery<?>> prefixLookups) {
212222
// table id -> (bucket -> lookups)
213223
Map<Long, Map<TableBucket, PrefixLookupBatch>> lookupByTableId = new HashMap<>();
214224
for (AbstractLookupQuery<?> abstractLookupQuery : prefixLookups) {
@@ -221,6 +231,19 @@ private void sendPrefixLookupRequest(
221231
.addLookup(prefixLookup);
222232
}
223233

234+
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
235+
if (gateway == null) {
236+
lookupByTableId.forEach(
237+
(tableId, lookupsByBucket) ->
238+
handlePrefixLookupException(
239+
new LeaderNotAvailableException(
240+
"Server "
241+
+ destination
242+
+ " is not found in metadata cache."),
243+
destination,
244+
lookupsByBucket));
245+
}
246+
224247
lookupByTableId.forEach(
225248
(tableId, prefixLookupBatch) ->
226249
sendPrefixLookupRequestAndHandleResponse(
@@ -396,7 +419,6 @@ private void handlePrefixLookupException(
396419
}
397420

398421
private void reEnqueueLookup(AbstractLookupQuery<?> lookup) {
399-
lookup.incrementRetries();
400422
lookupQueue.appendLookup(lookup);
401423
}
402424

@@ -455,6 +477,7 @@ private void handleLookupError(
455477
tableBucket,
456478
maxRetries - lookup.retries(),
457479
error.formatErrMsg());
480+
lookup.incrementRetries();
458481
reEnqueueLookup(lookup);
459482
} else {
460483
LOG.warn(

fluss-client/src/test/java/org/apache/fluss/client/lookup/LookupSenderTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ void testSendLookupRequestWithNotLeaderOrFollowerException() throws Exception {
136136
assertThat(result).isNotDone();
137137
lookupQueue.appendLookup(lookupQuery);
138138

139-
// Wait for all retries to complete and verify it eventually fails
140-
assertThatThrownBy(() -> result.get(5, TimeUnit.SECONDS))
141-
.isInstanceOf(ExecutionException.class)
142-
.hasMessageContaining("Leader not found after retry");
139+
// Wait for all retries to complete and verify it eventually fails. This case will be failed
140+
// after timeout.
141+
assertThatThrownBy(() -> result.get(2, TimeUnit.SECONDS))
142+
.isInstanceOf(java.util.concurrent.TimeoutException.class);
143143

144144
// Verify that retries happened (should be 1, because server meta invalidated)
145145
assertThat(lookupQuery.retries()).isEqualTo(1);
@@ -174,10 +174,10 @@ void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() throws Except
174174
assertThat(future).isNotDone();
175175
lookupQueue.appendLookup(prefixLookupQuery);
176176

177-
// Wait for all retries to complete and verify it eventually fails
178-
assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
179-
.isInstanceOf(ExecutionException.class)
180-
.hasMessageContaining("Leader not found after retry");
177+
// Wait for all retries to complete and verify it eventually fails. This case will be failed
178+
// after timeout.
179+
assertThatThrownBy(() -> future.get(2, TimeUnit.SECONDS))
180+
.isInstanceOf(java.util.concurrent.TimeoutException.class);
181181

182182
// Verify that retries happened (should be 1, because server meta invalidated)
183183
assertThat(prefixLookupQuery.retries()).isEqualTo(1);

0 commit comments

Comments
 (0)