Skip to content

Commit 19073f8

Browse files
authored
[client] Fix the bug where the loss of writeBatch was caused by the origin batchLocation forget to update ServerNode info (#1258)
1 parent eb3b5f8 commit 19073f8

File tree

19 files changed

+190
-168
lines changed

19 files changed

+190
-168
lines changed

fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
2525
import com.alibaba.fluss.cluster.Cluster;
2626
import com.alibaba.fluss.cluster.ServerNode;
27+
import com.alibaba.fluss.exception.LeaderNotAvailableException;
2728
import com.alibaba.fluss.metadata.DatabaseDescriptor;
2829
import com.alibaba.fluss.metadata.DatabaseInfo;
2930
import com.alibaba.fluss.metadata.PartitionInfo;
@@ -102,7 +103,6 @@ public class FlussAdmin implements Admin {
102103
private final AdminGateway gateway;
103104
private final AdminReadOnlyGateway readOnlyGateway;
104105
private final MetadataUpdater metadataUpdater;
105-
private final RpcClient client;
106106

107107
public FlussAdmin(RpcClient client, MetadataUpdater metadataUpdater) {
108108
this.gateway =
@@ -112,7 +112,6 @@ public FlussAdmin(RpcClient client, MetadataUpdater metadataUpdater) {
112112
GatewayClientProxy.createGatewayProxy(
113113
metadataUpdater::getRandomTabletServer, client, AdminGateway.class);
114114
this.metadataUpdater = metadataUpdater;
115-
this.client = client;
116115
}
117116

118117
@Override
@@ -407,7 +406,7 @@ private ListOffsetsResult listOffsets(
407406
bucketToOffsetMap.put(bucket, new CompletableFuture<>());
408407
}
409408

410-
sendListOffsetsRequest(metadataUpdater, client, requestMap, bucketToOffsetMap);
409+
sendListOffsetsRequest(metadataUpdater, requestMap, bucketToOffsetMap);
411410
return new ListOffsetsResult(bucketToOffsetMap);
412411
}
413412

@@ -493,34 +492,35 @@ private static Map<Integer, ListOffsetsRequest> prepareListOffsetsRequests(
493492

494493
private static void sendListOffsetsRequest(
495494
MetadataUpdater metadataUpdater,
496-
RpcClient client,
497495
Map<Integer, ListOffsetsRequest> leaderToRequestMap,
498496
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap) {
499497
leaderToRequestMap.forEach(
500498
(leader, request) -> {
501499
TabletServerGateway gateway =
502-
GatewayClientProxy.createGatewayProxy(
503-
() -> metadataUpdater.getTabletServer(leader),
504-
client,
505-
TabletServerGateway.class);
506-
gateway.listOffsets(request)
507-
.thenAccept(
508-
r -> {
509-
for (PbListOffsetsRespForBucket resp :
510-
r.getBucketsRespsList()) {
511-
if (resp.hasErrorCode()) {
512-
bucketToOffsetMap
513-
.get(resp.getBucketId())
514-
.completeExceptionally(
515-
ApiError.fromErrorMessage(resp)
516-
.exception());
517-
} else {
518-
bucketToOffsetMap
519-
.get(resp.getBucketId())
520-
.complete(resp.getOffset());
500+
metadataUpdater.newTabletServerClientForNode(leader);
501+
if (gateway == null) {
502+
throw new LeaderNotAvailableException(
503+
"Server " + leader + " is not found in metadata cache.");
504+
} else {
505+
gateway.listOffsets(request)
506+
.thenAccept(
507+
r -> {
508+
for (PbListOffsetsRespForBucket resp :
509+
r.getBucketsRespsList()) {
510+
if (resp.hasErrorCode()) {
511+
bucketToOffsetMap
512+
.get(resp.getBucketId())
513+
.completeExceptionally(
514+
ApiError.fromErrorMessage(resp)
515+
.exception());
516+
} else {
517+
bucketToOffsetMap
518+
.get(resp.getBucketId())
519+
.complete(resp.getOffset());
520+
}
521521
}
522-
}
523-
});
522+
});
523+
}
524524
});
525525
}
526526
}

fluss-client/src/main/java/com/alibaba/fluss/client/lookup/LookupSender.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.alibaba.fluss.annotation.Internal;
2121
import com.alibaba.fluss.client.metadata.MetadataUpdater;
2222
import com.alibaba.fluss.exception.FlussRuntimeException;
23+
import com.alibaba.fluss.exception.LeaderNotAvailableException;
2324
import com.alibaba.fluss.metadata.TableBucket;
2425
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
2526
import com.alibaba.fluss.rpc.messages.LookupRequest;
@@ -147,6 +148,11 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
147148
private void sendLookups(
148149
int destination, LookupType lookupType, List<AbstractLookupQuery<?>> lookupBatches) {
149150
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
151+
if (gateway == null) {
152+
// TODO handle this exception, like retry.
153+
throw new LeaderNotAvailableException(
154+
"Server " + destination + " is not found in metadata cache.");
155+
}
150156

151157
if (lookupType == LookupType.LOOKUP) {
152158
sendLookupRequest(gateway, lookupBatches);

fluss-client/src/main/java/com/alibaba/fluss/client/metadata/MetadataUpdater.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ public TableInfo getTableInfoOrElseThrow(long tableId) {
116116
}
117117

118118
public int leaderFor(TableBucket tableBucket) {
119-
ServerNode serverNode = cluster.leaderFor(tableBucket);
119+
Integer serverNode = cluster.leaderFor(tableBucket);
120120
if (serverNode == null) {
121121
for (int i = 0; i < MAX_RETRY_TIMES; i++) {
122122
TablePath tablePath = cluster.getTablePathOrElseThrow(tableBucket.getTableId());
@@ -144,10 +144,10 @@ public int leaderFor(TableBucket tableBucket) {
144144
}
145145
}
146146

147-
return serverNode.id();
147+
return serverNode;
148148
}
149149

150-
public @Nullable ServerNode getTabletServer(int id) {
150+
private @Nullable ServerNode getTabletServer(int id) {
151151
return cluster.getTabletServer(id);
152152
}
153153

@@ -165,10 +165,14 @@ public TabletServerGateway newRandomTabletServerClient() {
165165
this::getRandomTabletServer, rpcClient, TabletServerGateway.class);
166166
}
167167

168-
public TabletServerGateway newTabletServerClientForNode(int serverId) {
169-
final ServerNode serverNode = getTabletServer(serverId);
170-
return GatewayClientProxy.createGatewayProxy(
171-
() -> serverNode, rpcClient, TabletServerGateway.class);
168+
public @Nullable TabletServerGateway newTabletServerClientForNode(int serverId) {
169+
@Nullable final ServerNode serverNode = getTabletServer(serverId);
170+
if (serverNode == null) {
171+
return null;
172+
} else {
173+
return GatewayClientProxy.createGatewayProxy(
174+
() -> serverNode, rpcClient, TabletServerGateway.class);
175+
}
172176
}
173177

174178
public void checkAndUpdateTableMetadata(Set<TablePath> tablePaths) {

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/TableScan.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ public LogScanner createLogScanner() {
9494
return new LogScannerImpl(
9595
conn.getConfiguration(),
9696
tableInfo,
97-
conn.getRpcClient(),
9897
conn.getMetadataUpdater(),
9998
conn.getClientMetricGroup(),
10099
conn.getOrCreateRemoteFileDownloader(),

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/batch/LimitBatchScanner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.alibaba.fluss.client.table.scanner.batch;
1919

2020
import com.alibaba.fluss.client.metadata.MetadataUpdater;
21+
import com.alibaba.fluss.exception.LeaderNotAvailableException;
2122
import com.alibaba.fluss.metadata.TableBucket;
2223
import com.alibaba.fluss.metadata.TableInfo;
2324
import com.alibaba.fluss.record.DefaultValueRecordBatch;
@@ -94,7 +95,11 @@ public LimitBatchScanner(
9495
// because that rocksdb is not suitable to projection, thus do it in client.
9596
int leader = metadataUpdater.leaderFor(tableBucket);
9697
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
97-
98+
if (gateway == null) {
99+
// TODO handle this exception, like retry.
100+
throw new LeaderNotAvailableException(
101+
"Server " + leader + " is not found in metadata cache.");
102+
}
98103
this.scanFuture = gateway.limitScan(limitScanRequest);
99104

100105
this.kvValueDecoder =

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.alibaba.fluss.client.table.scanner.RemoteFileDownloader;
2525
import com.alibaba.fluss.client.table.scanner.ScanRecord;
2626
import com.alibaba.fluss.cluster.BucketLocation;
27-
import com.alibaba.fluss.cluster.ServerNode;
2827
import com.alibaba.fluss.config.ConfigOptions;
2928
import com.alibaba.fluss.config.Configuration;
3029
import com.alibaba.fluss.exception.InvalidMetadataException;
@@ -40,8 +39,6 @@
4039
import com.alibaba.fluss.record.MemoryLogRecords;
4140
import com.alibaba.fluss.remote.RemoteLogFetchInfo;
4241
import com.alibaba.fluss.remote.RemoteLogSegment;
43-
import com.alibaba.fluss.rpc.GatewayClientProxy;
44-
import com.alibaba.fluss.rpc.RpcClient;
4542
import com.alibaba.fluss.rpc.entity.FetchLogResultForBucket;
4643
import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
4744
import com.alibaba.fluss.rpc.messages.FetchLogRequest;
@@ -91,7 +88,6 @@ public class LogFetcher implements Closeable {
9188
// bytes from remote file.
9289
private final LogRecordReadContext remoteReadContext;
9390
@Nullable private final Projection projection;
94-
private final RpcClient rpcClient;
9591
private final int maxFetchBytes;
9692
private final int maxBucketFetchBytes;
9793
private final int minFetchBytes;
@@ -114,7 +110,6 @@ public class LogFetcher implements Closeable {
114110
public LogFetcher(
115111
TableInfo tableInfo,
116112
@Nullable Projection projection,
117-
RpcClient rpcClient,
118113
LogScannerStatus logScannerStatus,
119114
Configuration conf,
120115
MetadataUpdater metadataUpdater,
@@ -126,7 +121,6 @@ public LogFetcher(
126121
this.remoteReadContext =
127122
LogRecordReadContext.createReadContext(tableInfo, true, projection);
128123
this.projection = projection;
129-
this.rpcClient = rpcClient;
130124
this.logScannerStatus = logScannerStatus;
131125
this.maxFetchBytes =
132126
(int) conf.get(ConfigOptions.CLIENT_SCANNER_LOG_FETCH_MAX_BYTES).getBytes();
@@ -199,18 +193,14 @@ private void sendFetchRequest(int destination, FetchLogRequest fetchLogRequest)
199193
TableOrPartitions tableOrPartitionsInFetchRequest =
200194
getTableOrPartitionsInFetchRequest(fetchLogRequest);
201195
// TODO cache the tablet server gateway.
202-
ServerNode destinationNode = metadataUpdater.getTabletServer(destination);
203-
if (destinationNode == null) {
196+
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(destination);
197+
if (gateway == null) {
204198
handleFetchLogException(
205199
destination,
206200
tableOrPartitionsInFetchRequest,
207201
new LeaderNotAvailableException(
208202
"Server " + destination + " is not found in metadata cache."));
209203
} else {
210-
TabletServerGateway gateway =
211-
GatewayClientProxy.createGatewayProxy(
212-
() -> destinationNode, rpcClient, TabletServerGateway.class);
213-
214204
final long requestStartTime = System.currentTimeMillis();
215205
scannerMetricGroup.fetchRequestCount().inc();
216206

@@ -485,7 +475,7 @@ private Integer getTableBucketLeader(TableBucket tableBucket) {
485475
if (metadataUpdater.getBucketLocation(tableBucket).isPresent()) {
486476
BucketLocation bucketLocation = metadataUpdater.getBucketLocation(tableBucket).get();
487477
if (bucketLocation.getLeader() != null) {
488-
return bucketLocation.getLeader().id();
478+
return bucketLocation.getLeader();
489479
}
490480
}
491481

fluss-client/src/main/java/com/alibaba/fluss/client/table/scanner/log/LogScannerImpl.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.alibaba.fluss.metadata.TableBucket;
2828
import com.alibaba.fluss.metadata.TableInfo;
2929
import com.alibaba.fluss.metadata.TablePath;
30-
import com.alibaba.fluss.rpc.RpcClient;
3130
import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
3231
import com.alibaba.fluss.types.RowType;
3332
import com.alibaba.fluss.utils.Projection;
@@ -80,7 +79,6 @@ public class LogScannerImpl implements LogScanner {
8079
public LogScannerImpl(
8180
Configuration conf,
8281
TableInfo tableInfo,
83-
RpcClient rpcClient,
8482
MetadataUpdater metadataUpdater,
8583
ClientMetricGroup clientMetricGroup,
8684
RemoteFileDownloader remoteFileDownloader,
@@ -98,7 +96,6 @@ public LogScannerImpl(
9896
new LogFetcher(
9997
tableInfo,
10098
projection,
101-
rpcClient,
10299
logScannerStatus,
103100
conf,
104101
metadataUpdater,

fluss-client/src/main/java/com/alibaba/fluss/client/utils/MetadataUtils.java

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
123123
Map<PhysicalTablePath, Long> newPartitionIdByPath;
124124

125125
NewTableMetadata newTableMetadata =
126-
getTableMetadataToUpdate(
127-
originCluster, response, newAliveTabletServers);
126+
getTableMetadataToUpdate(originCluster, response);
128127

129128
if (partialUpdate) {
130129
// If partial update, we will clear the to be updated table out ot
@@ -167,9 +166,7 @@ public static Cluster sendMetadataRequestAndRebuildCluster(
167166
}
168167

169168
private static NewTableMetadata getTableMetadataToUpdate(
170-
Cluster cluster,
171-
MetadataResponse metadataResponse,
172-
Map<Integer, ServerNode> newAliveTableServers) {
169+
Cluster cluster, MetadataResponse metadataResponse) {
173170
Map<TablePath, Long> newTablePathToTableId = new HashMap<>();
174171
Map<TablePath, TableInfo> newTablePathToTableInfo = new HashMap<>();
175172
Map<PhysicalTablePath, List<BucketLocation>> newBucketLocations = new HashMap<>();
@@ -205,12 +202,7 @@ private static NewTableMetadata getTableMetadataToUpdate(
205202
newBucketLocations.put(
206203
PhysicalTablePath.of(tablePath),
207204
toBucketLocations(
208-
tablePath,
209-
tableId,
210-
null,
211-
null,
212-
pbBucketMetadataList,
213-
newAliveTableServers));
205+
tablePath, tableId, null, null, pbBucketMetadataList));
214206
});
215207

216208
List<PbPartitionMetadata> pbPartitionMetadataList =
@@ -233,8 +225,7 @@ private static NewTableMetadata getTableMetadataToUpdate(
233225
tableId,
234226
pbPartitionMetadata.getPartitionId(),
235227
pbPartitionMetadata.getPartitionName(),
236-
pbPartitionMetadata.getBucketMetadatasList(),
237-
newAliveTableServers));
228+
pbPartitionMetadata.getBucketMetadatasList()));
238229
});
239230

240231
return new NewTableMetadata(
@@ -309,19 +300,18 @@ private static List<BucketLocation> toBucketLocations(
309300
long tableId,
310301
@Nullable Long partitionId,
311302
@Nullable String partitionName,
312-
List<PbBucketMetadata> pbBucketMetadataList,
313-
Map<Integer, ServerNode> newAliveTableServers) {
303+
List<PbBucketMetadata> pbBucketMetadataList) {
314304
List<BucketLocation> bucketLocations = new ArrayList<>();
315305
for (PbBucketMetadata pbBucketMetadata : pbBucketMetadataList) {
316306
int bucketId = pbBucketMetadata.getBucketId();
317307
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
318-
ServerNode[] replicas = new ServerNode[pbBucketMetadata.getReplicaIdsCount()];
308+
int[] replicas = new int[pbBucketMetadata.getReplicaIdsCount()];
319309
for (int i = 0; i < replicas.length; i++) {
320-
replicas[i] = newAliveTableServers.get(pbBucketMetadata.getReplicaIdAt(i));
310+
replicas[i] = pbBucketMetadata.getReplicaIdAt(i);
321311
}
322-
ServerNode leader = null;
312+
Integer leader = null;
323313
if (pbBucketMetadata.hasLeaderId()) {
324-
leader = newAliveTableServers.get(pbBucketMetadata.getLeaderId());
314+
leader = pbBucketMetadata.getLeaderId();
325315
}
326316
PhysicalTablePath physicalTablePath = PhysicalTablePath.of(tablePath, partitionName);
327317

0 commit comments

Comments
 (0)