Skip to content

Commit e47748b

Browse files
authored
[Client] Fix LogFetcher doesn't update bucket metadata when receive NotLeaderOrFollowerException in bucket level (#1885)
1 parent 356004c commit e47748b

File tree

7 files changed

+578
-381
lines changed

7 files changed

+578
-381
lines changed

fluss-client/src/main/java/org/apache/fluss/client/metadata/ClientSchemaGetter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class ClientSchemaGetter implements SchemaGetter {
4242
private static final Logger LOG = LoggerFactory.getLogger(ClientSchemaGetter.class);
4343

4444
private final TablePath tablePath;
45-
private final Map<Integer, Schema> schemasById;
45+
protected final Map<Integer, Schema> schemasById;
4646
private final Admin admin;
4747
private volatile SchemaInfo latestSchemaInfo;
4848

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,10 @@
2626
import org.apache.fluss.cluster.BucketLocation;
2727
import org.apache.fluss.config.ConfigOptions;
2828
import org.apache.fluss.config.Configuration;
29+
import org.apache.fluss.exception.ApiException;
2930
import org.apache.fluss.exception.InvalidMetadataException;
3031
import org.apache.fluss.exception.LeaderNotAvailableException;
32+
import org.apache.fluss.exception.PartitionNotExistException;
3133
import org.apache.fluss.fs.FsPath;
3234
import org.apache.fluss.metadata.PhysicalTablePath;
3335
import org.apache.fluss.metadata.SchemaGetter;
@@ -48,6 +50,7 @@
4850
import org.apache.fluss.rpc.messages.PbFetchLogReqForTable;
4951
import org.apache.fluss.rpc.messages.PbFetchLogRespForBucket;
5052
import org.apache.fluss.rpc.messages.PbFetchLogRespForTable;
53+
import org.apache.fluss.rpc.protocol.ApiError;
5154
import org.apache.fluss.rpc.protocol.Errors;
5255
import org.apache.fluss.utils.IOUtils;
5356
import org.apache.fluss.utils.Projection;
@@ -217,14 +220,27 @@ private void checkAndUpdateMetadata(List<TableBucket> tableBuckets) {
217220
}
218221
}
219222

220-
if (isPartitioned && !partitionIds.isEmpty()) {
221-
metadataUpdater.updateMetadata(Collections.singleton(tablePath), null, partitionIds);
222-
} else if (needUpdate) {
223-
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
223+
try {
224+
if (isPartitioned && !partitionIds.isEmpty()) {
225+
metadataUpdater.updateMetadata(
226+
Collections.singleton(tablePath), null, partitionIds);
227+
} else if (needUpdate) {
228+
metadataUpdater.updateTableOrPartitionMetadata(tablePath, null);
229+
}
230+
} catch (Exception e) {
231+
if (e instanceof PartitionNotExistException) {
232+
// ignore this exception, this is probably happen because the partition is deleted.
233+
// The fetcher can also work fine. The caller like flink can remove the partition
234+
// from fetch list when receive exception.
235+
LOG.warn("Receive PartitionNotExistException when update metadata, ignore it", e);
236+
} else {
237+
throw e;
238+
}
224239
}
225240
}
226241

227-
private void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) {
242+
@VisibleForTesting
243+
void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest) {
228244
TableOrPartitions tableOrPartitionsInFetchRequest =
229245
getTableOrPartitionsInFetchRequest(fetchLogRequest);
230246
// TODO cache the tablet server gateway.
@@ -345,6 +361,14 @@ private synchronized void handleFetchLogResponse(
345361
respForBucket.getBucketId());
346362
FetchLogResultForBucket fetchResultForBucket =
347363
getFetchLogResultForBucket(tb, tablePath, respForBucket);
364+
365+
// if error code is not NONE, it means the fetch log request failed, we need to
366+
// clear table bucket meta for InvalidMetadataException.
367+
if (fetchResultForBucket.getErrorCode() != Errors.NONE.code()) {
368+
ApiError error = ApiError.fromErrorMessage(respForBucket);
369+
handleFetchLogExceptionForBucket(tb, destination, error);
370+
}
371+
348372
Long fetchOffset = logScannerStatus.getBucketOffset(tb);
349373
// if the offset is null, it means the bucket has been unsubscribed,
350374
// we just set a Long.MAX_VALUE as the next fetch offset
@@ -387,6 +411,29 @@ private synchronized void handleFetchLogResponse(
387411
}
388412
}
389413

414+
private void handleFetchLogExceptionForBucket(TableBucket tb, int destination, ApiError error) {
415+
ApiException exception = error.error().exception();
416+
LOG.error("Failed to fetch log from node {} for bucket {}", destination, tb, exception);
417+
if (exception instanceof InvalidMetadataException) {
418+
LOG.warn(
419+
"Invalid metadata error in fetch log request. "
420+
+ "Going to request metadata update.",
421+
exception);
422+
long tableId = tb.getTableId();
423+
TableOrPartitions tableOrPartitions;
424+
if (tb.getPartitionId() == null) {
425+
tableOrPartitions = new TableOrPartitions(Collections.singleton(tableId), null);
426+
} else {
427+
tableOrPartitions =
428+
new TableOrPartitions(
429+
null,
430+
Collections.singleton(
431+
new TablePartition(tableId, tb.getPartitionId())));
432+
}
433+
invalidTableOrPartitions(tableOrPartitions);
434+
}
435+
}
436+
390437
private void pendRemoteFetches(
391438
RemoteLogFetchInfo remoteLogFetchInfo, long firstFetchOffset, long highWatermark) {
392439
checkNotNull(remoteLogFetchInfo);
@@ -417,7 +464,8 @@ private void pendRemoteFetches(
417464
}
418465
}
419466

420-
private Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
467+
@VisibleForTesting
468+
Map<Integer, FetchLogRequest> prepareFetchLogRequests() {
421469
Map<Integer, List<PbFetchLogReqForBucket>> fetchLogReqForBuckets = new HashMap<>();
422470
int readyForFetchCount = 0;
423471
Long tableId = null;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.client.admin.FlussAdmin;
21+
import org.apache.fluss.config.Configuration;
22+
import org.apache.fluss.metadata.Schema;
23+
import org.apache.fluss.metadata.SchemaInfo;
24+
import org.apache.fluss.metadata.TablePath;
25+
import org.apache.fluss.rpc.RpcClient;
26+
import org.apache.fluss.rpc.metrics.TestingClientMetricGroup;
27+
28+
import java.util.concurrent.CompletableFuture;
29+
30+
/** Testing class for {@link ClientSchemaGetter}. */
31+
public class TestingClientSchemaGetter extends ClientSchemaGetter {
32+
public TestingClientSchemaGetter(
33+
TablePath tablePath,
34+
SchemaInfo latestSchemaInfo,
35+
TestingMetadataUpdater metadataUpdater) {
36+
super(
37+
tablePath,
38+
latestSchemaInfo,
39+
new FlussAdmin(
40+
RpcClient.create(
41+
new Configuration(), TestingClientMetricGroup.newInstance(), false),
42+
metadataUpdater));
43+
}
44+
45+
@Override
46+
public Schema getSchema(int schemaId) {
47+
return schemasById.get(schemaId);
48+
}
49+
50+
@Override
51+
public CompletableFuture<SchemaInfo> getSchemaInfoAsync(int schemaId) {
52+
return CompletableFuture.completedFuture(
53+
new SchemaInfo(schemasById.get(schemaId), schemaId));
54+
}
55+
}

fluss-client/src/test/java/org/apache/fluss/client/metadata/TestingMetadataUpdater.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@
4242

4343
/** Testing class for metadata updater. */
4444
public class TestingMetadataUpdater extends MetadataUpdater {
45-
private static final ServerNode COORDINATOR =
45+
public static final ServerNode COORDINATOR =
4646
new ServerNode(0, "localhost", 90, ServerType.COORDINATOR);
47-
private static final ServerNode NODE1 =
47+
public static final ServerNode NODE1 =
4848
new ServerNode(1, "localhost", 90, ServerType.TABLET_SERVER, "rack1");
49-
private static final ServerNode NODE2 =
49+
public static final ServerNode NODE2 =
5050
new ServerNode(2, "localhost", 91, ServerType.TABLET_SERVER, "rack2");
51-
private static final ServerNode NODE3 =
51+
public static final ServerNode NODE3 =
5252
new ServerNode(3, "localhost", 92, ServerType.TABLET_SERVER, "rack3");
5353

5454
private final TestCoordinatorGateway coordinatorGateway;
@@ -63,7 +63,7 @@ public TestingMetadataUpdater(Map<TablePath, TableInfo> tableInfos) {
6363
new Configuration());
6464
}
6565

66-
private TestingMetadataUpdater(
66+
public TestingMetadataUpdater(
6767
ServerNode coordinatorServer,
6868
List<ServerNode> tabletServers,
6969
Map<TablePath, TableInfo> tableInfos,
@@ -137,10 +137,6 @@ public void updateCluster(Cluster cluster) {
137137
this.cluster = cluster;
138138
}
139139

140-
public void setResponseLogicId(int serverId, int responseLogicId) {
141-
tabletServerGatewayMap.get(serverId).setResponseLogicId(responseLogicId);
142-
}
143-
144140
@Override
145141
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {
146142
Set<TablePath> needUpdateTablePaths =

0 commit comments

Comments
 (0)