Skip to content

Commit 329f24f

Browse files
committed
[Client] Fix LookupSender doesn't update bucket metadata when receive NotLeaderOrFollowerException in bucket level
1 parent dfc274c commit 329f24f

File tree

4 files changed

+259
-26
lines changed

4 files changed

+259
-26
lines changed

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,15 @@
1818
package org.apache.fluss.client.lookup;
1919

2020
import org.apache.fluss.annotation.Internal;
21+
import org.apache.fluss.annotation.VisibleForTesting;
2122
import org.apache.fluss.client.metadata.MetadataUpdater;
23+
import org.apache.fluss.exception.ApiException;
2224
import org.apache.fluss.exception.FlussRuntimeException;
25+
import org.apache.fluss.exception.InvalidMetadataException;
2326
import org.apache.fluss.exception.LeaderNotAvailableException;
27+
import org.apache.fluss.metadata.PhysicalTablePath;
2428
import org.apache.fluss.metadata.TableBucket;
29+
import org.apache.fluss.metadata.TablePartition;
2530
import org.apache.fluss.rpc.gateway.TabletServerGateway;
2631
import org.apache.fluss.rpc.messages.LookupRequest;
2732
import org.apache.fluss.rpc.messages.LookupResponse;
@@ -36,10 +41,14 @@
3641
import org.slf4j.Logger;
3742
import org.slf4j.LoggerFactory;
3843

44+
import javax.annotation.Nullable;
45+
3946
import java.util.ArrayList;
47+
import java.util.Collections;
4048
import java.util.HashMap;
4149
import java.util.List;
4250
import java.util.Map;
51+
import java.util.Set;
4352
import java.util.concurrent.Semaphore;
4453
import java.util.stream.Collectors;
4554

@@ -145,7 +154,8 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
145154
return lookupBatchesByLeader;
146155
}
147156

148-
private void sendLookups(
157+
@VisibleForTesting
158+
void sendLookups(
149159
int destination, LookupType lookupType, List<AbstractLookupQuery<?>> lookupBatches) {
150160
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
151161
if (gateway == null) {
@@ -155,16 +165,16 @@ private void sendLookups(
155165
}
156166

157167
if (lookupType == LookupType.LOOKUP) {
158-
sendLookupRequest(gateway, lookupBatches);
168+
sendLookupRequest(destination, gateway, lookupBatches);
159169
} else if (lookupType == LookupType.PREFIX_LOOKUP) {
160-
sendPrefixLookupRequest(gateway, lookupBatches);
170+
sendPrefixLookupRequest(destination, gateway, lookupBatches);
161171
} else {
162172
throw new IllegalArgumentException("Unsupported lookup type: " + lookupType);
163173
}
164174
}
165175

166176
private void sendLookupRequest(
167-
TabletServerGateway gateway, List<AbstractLookupQuery<?>> lookups) {
177+
int destination, TabletServerGateway gateway, List<AbstractLookupQuery<?>> lookups) {
168178
// table id -> (bucket -> lookups)
169179
Map<Long, Map<TableBucket, LookupBatch>> lookupByTableId = new HashMap<>();
170180
for (AbstractLookupQuery<?> abstractLookupQuery : lookups) {
@@ -180,14 +190,17 @@ private void sendLookupRequest(
180190
lookupByTableId.forEach(
181191
(tableId, lookupsByBucket) ->
182192
sendLookupRequestAndHandleResponse(
193+
destination,
183194
gateway,
184195
makeLookupRequest(tableId, lookupsByBucket.values()),
185196
tableId,
186197
lookupsByBucket));
187198
}
188199

189200
private void sendPrefixLookupRequest(
190-
TabletServerGateway gateway, List<AbstractLookupQuery<?>> prefixLookups) {
201+
int destination,
202+
TabletServerGateway gateway,
203+
List<AbstractLookupQuery<?>> prefixLookups) {
191204
// table id -> (bucket -> lookups)
192205
Map<Long, Map<TableBucket, PrefixLookupBatch>> lookupByTableId = new HashMap<>();
193206
for (AbstractLookupQuery<?> abstractLookupQuery : prefixLookups) {
@@ -203,13 +216,15 @@ private void sendPrefixLookupRequest(
203216
lookupByTableId.forEach(
204217
(tableId, prefixLookupBatch) ->
205218
sendPrefixLookupRequestAndHandleResponse(
219+
destination,
206220
gateway,
207221
makePrefixLookupRequest(tableId, prefixLookupBatch.values()),
208222
tableId,
209223
prefixLookupBatch));
210224
}
211225

212226
private void sendLookupRequestAndHandleResponse(
227+
int destination,
213228
TabletServerGateway gateway,
214229
LookupRequest lookupRequest,
215230
long tableId,
@@ -224,15 +239,16 @@ private void sendLookupRequestAndHandleResponse(
224239
.thenAccept(
225240
lookupResponse -> {
226241
try {
227-
handleLookupResponse(tableId, lookupResponse, lookupsByBucket);
242+
handleLookupResponse(
243+
tableId, destination, lookupResponse, lookupsByBucket);
228244
} finally {
229245
maxInFlightReuqestsSemaphore.release();
230246
}
231247
})
232248
.exceptionally(
233249
e -> {
234250
try {
235-
handleLookupRequestException(e, lookupsByBucket);
251+
handleLookupRequestException(e, destination, lookupsByBucket);
236252
return null;
237253
} finally {
238254
maxInFlightReuqestsSemaphore.release();
@@ -241,6 +257,7 @@ private void sendLookupRequestAndHandleResponse(
241257
}
242258

243259
private void sendPrefixLookupRequestAndHandleResponse(
260+
int destination,
244261
TabletServerGateway gateway,
245262
PrefixLookupRequest prefixLookupRequest,
246263
long tableId,
@@ -256,15 +273,18 @@ private void sendPrefixLookupRequestAndHandleResponse(
256273
prefixLookupResponse -> {
257274
try {
258275
handlePrefixLookupResponse(
259-
tableId, prefixLookupResponse, lookupsByBucket);
276+
tableId,
277+
destination,
278+
prefixLookupResponse,
279+
lookupsByBucket);
260280
} finally {
261281
maxInFlightReuqestsSemaphore.release();
262282
}
263283
})
264284
.exceptionally(
265285
e -> {
266286
try {
267-
handlePrefixLookupException(e, lookupsByBucket);
287+
handlePrefixLookupException(e, destination, lookupsByBucket);
268288
return null;
269289
} finally {
270290
maxInFlightReuqestsSemaphore.release();
@@ -274,6 +294,7 @@ private void sendPrefixLookupRequestAndHandleResponse(
274294

275295
private void handleLookupResponse(
276296
long tableId,
297+
int destination,
277298
LookupResponse lookupResponse,
278299
Map<TableBucket, LookupBatch> lookupsByBucket) {
279300
for (PbLookupRespForBucket pbLookupRespForBucket : lookupResponse.getBucketsRespsList()) {
@@ -288,10 +309,7 @@ private void handleLookupResponse(
288309
if (pbLookupRespForBucket.hasErrorCode()) {
289310
// TODO for re-triable error, we should retry here instead of throwing exception.
290311
ApiError error = ApiError.fromErrorMessage(pbLookupRespForBucket);
291-
LOG.warn(
292-
"Get error lookup response on table bucket {}, fail. Error: {}",
293-
tableBucket,
294-
error.formatErrMsg());
312+
handleLookupExceptionForBucket(tableBucket, destination, error, "lookup");
295313
lookupBatch.completeExceptionally(error.exception());
296314
} else {
297315
List<byte[]> byteValues =
@@ -312,6 +330,7 @@ private void handleLookupResponse(
312330

313331
private void handlePrefixLookupResponse(
314332
long tableId,
333+
int destination,
315334
PrefixLookupResponse prefixLookupResponse,
316335
Map<TableBucket, PrefixLookupBatch> prefixLookupsByBucket) {
317336
for (PbPrefixLookupRespForBucket pbRespForBucket :
@@ -328,10 +347,7 @@ private void handlePrefixLookupResponse(
328347
if (pbRespForBucket.hasErrorCode()) {
329348
// TODO for re-triable error, we should retry here instead of throwing exception.
330349
ApiError error = ApiError.fromErrorMessage(pbRespForBucket);
331-
LOG.warn(
332-
"Get error prefix lookup response on table bucket {}, fail. Error: {}",
333-
tableBucket,
334-
error.formatErrMsg());
350+
handleLookupExceptionForBucket(tableBucket, destination, error, "prefixLookup");
335351
prefixLookupBatch.completeExceptionally(error.exception());
336352
} else {
337353
List<List<byte[]>> result = new ArrayList<>(pbRespForBucket.getValueListsCount());
@@ -349,24 +365,22 @@ private void handlePrefixLookupResponse(
349365
}
350366

351367
private void handleLookupRequestException(
352-
Throwable t, Map<TableBucket, LookupBatch> lookupsByBucket) {
368+
Throwable t, int destination, Map<TableBucket, LookupBatch> lookupsByBucket) {
353369
ApiError error = ApiError.fromThrowable(t);
354370
for (LookupBatch lookupBatch : lookupsByBucket.values()) {
355371
// TODO for re-triable error, we should retry here instead of throwing exception.
356-
LOG.warn(
357-
"Get error lookup response on table bucket {}, fail. Error: {}",
358-
lookupBatch.tableBucket(),
359-
error.formatErrMsg());
372+
handleLookupExceptionForBucket(lookupBatch.tableBucket(), destination, error, "lookup");
360373
lookupBatch.completeExceptionally(error.exception());
361374
}
362375
}
363376

364377
private void handlePrefixLookupException(
365-
Throwable t, Map<TableBucket, PrefixLookupBatch> lookupsByBucket) {
378+
Throwable t, int destination, Map<TableBucket, PrefixLookupBatch> lookupsByBucket) {
366379
ApiError error = ApiError.fromThrowable(t);
367380
// TODO If error, we need to retry send the request instead of throw exception.
368-
LOG.warn("Get error prefix lookup response. Error: {}", error.formatErrMsg());
369381
for (PrefixLookupBatch lookupBatch : lookupsByBucket.values()) {
382+
handleLookupExceptionForBucket(
383+
lookupBatch.tableBucket(), destination, error, "prefixLookup");
370384
lookupBatch.completeExceptionally(error.exception());
371385
}
372386
}
@@ -382,4 +396,48 @@ void initiateClose() {
382396
lookupQueue.close();
383397
running = false;
384398
}
399+
400+
private void handleLookupExceptionForBucket(
401+
TableBucket tb, int destination, ApiError error, String lookupType) {
402+
ApiException exception = error.error().exception();
403+
LOG.error(
404+
"Failed to {} from node {} for bucket {}", lookupType, destination, tb, exception);
405+
if (exception instanceof InvalidMetadataException) {
406+
LOG.warn(
407+
"Invalid metadata error in {} request. Going to request metadata update.",
408+
lookupType,
409+
exception);
410+
long tableId = tb.getTableId();
411+
TableOrPartitions tableOrPartitions;
412+
if (tb.getPartitionId() == null) {
413+
tableOrPartitions = new TableOrPartitions(Collections.singleton(tableId), null);
414+
} else {
415+
tableOrPartitions =
416+
new TableOrPartitions(
417+
null,
418+
Collections.singleton(
419+
new TablePartition(tableId, tb.getPartitionId())));
420+
}
421+
invalidTableOrPartitions(tableOrPartitions);
422+
}
423+
}
424+
425+
/** A helper class to hold table ids or table partitions. */
426+
private static class TableOrPartitions {
427+
private final @Nullable Set<Long> tableIds;
428+
private final @Nullable Set<TablePartition> tablePartitions;
429+
430+
TableOrPartitions(
431+
@Nullable Set<Long> tableIds, @Nullable Set<TablePartition> tablePartitions) {
432+
this.tableIds = tableIds;
433+
this.tablePartitions = tablePartitions;
434+
}
435+
}
436+
437+
private void invalidTableOrPartitions(TableOrPartitions tableOrPartitions) {
438+
Set<PhysicalTablePath> physicalTablePaths =
439+
metadataUpdater.getPhysicalTablePathByIds(
440+
tableOrPartitions.tableIds, tableOrPartitions.tablePartitions);
441+
metadataUpdater.invalidPhysicalTableBucketMeta(physicalTablePaths);
442+
}
385443
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.lookup;
19+
20+
import org.apache.fluss.client.metadata.TestingMetadataUpdater;
21+
import org.apache.fluss.cluster.BucketLocation;
22+
import org.apache.fluss.config.ConfigOptions;
23+
import org.apache.fluss.config.Configuration;
24+
import org.apache.fluss.exception.NotLeaderOrFollowerException;
25+
import org.apache.fluss.metadata.PhysicalTablePath;
26+
import org.apache.fluss.metadata.TableBucket;
27+
28+
import org.junit.jupiter.api.BeforeEach;
29+
import org.junit.jupiter.api.Test;
30+
31+
import java.util.Collections;
32+
import java.util.List;
33+
import java.util.concurrent.CompletableFuture;
34+
35+
import static org.apache.fluss.record.TestData.DATA1_TABLE_ID_PK;
36+
import static org.apache.fluss.record.TestData.DATA1_TABLE_INFO_PK;
37+
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH_PK;
38+
import static org.assertj.core.api.Assertions.assertThat;
39+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
40+
41+
/** Tests for {@link LookupSender}. */
42+
public class LookupSenderTest {
43+
44+
private final TableBucket tb1 = new TableBucket(DATA1_TABLE_ID_PK, 0);
45+
46+
private TestingMetadataUpdater metadataUpdater;
47+
private LookupSender lookupSender;
48+
49+
@BeforeEach
50+
public void setup() {
51+
metadataUpdater = initializeMetadataUpdater();
52+
Configuration conf = new Configuration();
53+
conf.set(ConfigOptions.CLIENT_LOOKUP_QUEUE_SIZE, 5);
54+
conf.set(ConfigOptions.CLIENT_LOOKUP_MAX_BATCH_SIZE, 10);
55+
lookupSender = new LookupSender(metadataUpdater, new LookupQueue(conf), 5);
56+
}
57+
58+
@Test
59+
void testSendLookupRequestWithNotLeaderOrFollowerException() {
60+
assertThat(metadataUpdater.getBucketLocation(tb1))
61+
.hasValue(
62+
new BucketLocation(
63+
PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
64+
tb1,
65+
1,
66+
new int[] {1, 2, 3}));
67+
68+
// send LookupRequest to serverId 1, which will respond with NotLeaderOrFollowerException
69+
// as responseLogicId=1 do.
70+
metadataUpdater.setResponseLogicId(1, 1);
71+
LookupQuery lookupQuery = new LookupQuery(tb1, new byte[0]);
72+
CompletableFuture<byte[]> result = lookupQuery.future();
73+
assertThat(result).isNotDone();
74+
lookupSender.sendLookups(1, LookupType.LOOKUP, Collections.singletonList(lookupQuery));
75+
76+
assertThat(result.isCompletedExceptionally()).isTrue();
77+
assertThatThrownBy(result::get)
78+
.rootCause()
79+
.isInstanceOf(NotLeaderOrFollowerException.class)
80+
.hasMessage("mock not leader or follower exception.");
81+
// When NotLeaderOrFollowerException is received, the bucketLocation will be removed from
82+
// metadata updater to trigger get the latest bucketLocation in next lookup round.
83+
assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
84+
}
85+
86+
@Test
87+
void testSendPrefixLookupRequestWithNotLeaderOrFollowerException() {
88+
assertThat(metadataUpdater.getBucketLocation(tb1))
89+
.hasValue(
90+
new BucketLocation(
91+
PhysicalTablePath.of(DATA1_TABLE_PATH_PK),
92+
tb1,
93+
1,
94+
new int[] {1, 2, 3}));
95+
96+
// send PrefixLookupRequest to serverId 1, which will respond with
97+
// NotLeaderOrFollowerException as responseLogicId=1 do.
98+
metadataUpdater.setResponseLogicId(1, 1);
99+
PrefixLookupQuery prefixLookupQuery = new PrefixLookupQuery(tb1, new byte[0]);
100+
CompletableFuture<List<byte[]>> future = prefixLookupQuery.future();
101+
assertThat(future).isNotDone();
102+
lookupSender.sendLookups(
103+
1, LookupType.PREFIX_LOOKUP, Collections.singletonList(prefixLookupQuery));
104+
105+
assertThat(future.isCompletedExceptionally()).isTrue();
106+
assertThatThrownBy(future::get)
107+
.rootCause()
108+
.isInstanceOf(NotLeaderOrFollowerException.class)
109+
.hasMessage("mock not leader or follower exception.");
110+
// When NotLeaderOrFollowerException is received, the bucketLocation will be removed from
111+
// metadata updater to trigger get the latest bucketLocation in next lookup round.
112+
assertThat(metadataUpdater.getBucketLocation(tb1)).isNotPresent();
113+
}
114+
115+
private TestingMetadataUpdater initializeMetadataUpdater() {
116+
return new TestingMetadataUpdater(
117+
Collections.singletonMap(DATA1_TABLE_PATH_PK, DATA1_TABLE_INFO_PK));
118+
}
119+
}

fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ public void updateCluster(Cluster cluster) {
7878
this.cluster = cluster;
7979
}
8080

81+
public void setResponseLogicId(int serverId, int responseLogicId) {
82+
tabletServerGatewayMap.get(serverId).setResponseLogicId(responseLogicId);
83+
}
84+
8185
@Override
8286
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
8387
Set<TablePath> needUpdateTablePaths =

0 commit comments

Comments
 (0)