Skip to content

Commit bfc37aa

Browse files
fix client deadlock
1 parent e5d10ac commit bfc37aa

File tree

9 files changed

+1531
-153
lines changed

9 files changed

+1531
-153
lines changed

fluss-client/src/main/java/org/apache/fluss/client/FlussConnection.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,9 @@ public void close() throws Exception {
214214
securityTokenManager.stop();
215215
}
216216

217+
// Close metadata updater to release scheduler and pending requests
218+
metadataUpdater.close();
219+
217220
clientMetricGroup.close();
218221
rpcClient.close();
219222
metricRegistry.closeAsync().get();

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 56 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,16 @@
2828
import org.apache.fluss.row.InternalRow;
2929
import org.apache.fluss.row.encode.KeyEncoder;
3030
import org.apache.fluss.types.RowType;
31+
import org.apache.fluss.utils.ExceptionUtils;
32+
import org.apache.fluss.utils.concurrent.FutureUtils;
3133

3234
import javax.annotation.Nullable;
3335
import javax.annotation.concurrent.NotThreadSafe;
3436

3537
import java.util.Collections;
3638
import java.util.concurrent.CompletableFuture;
3739

38-
import static org.apache.fluss.client.utils.ClientUtils.getPartitionId;
40+
import static org.apache.fluss.client.utils.ClientUtils.getPartitionIdAsync;
3941
import static org.apache.fluss.utils.Preconditions.checkArgument;
4042

4143
/** An implementation of {@link Lookuper} that lookups by primary key. */
@@ -89,44 +91,68 @@ public PrimaryKeyLookuper(
8991

9092
@Override
9193
public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
92-
// encoding the key row using a compacted way consisted with how the key is encoded when put
93-
// a row
94-
byte[] pkBytes = primaryKeyEncoder.encodeKey(lookupKey);
95-
byte[] bkBytes =
96-
bucketKeyEncoder == primaryKeyEncoder
97-
? pkBytes
98-
: bucketKeyEncoder.encodeKey(lookupKey);
99-
Long partitionId = null;
100-
if (partitionGetter != null) {
101-
try {
102-
partitionId =
103-
getPartitionId(
94+
try {
95+
// encoding the key row using a compacted way consisted with how the key is encoded when
96+
// put a row
97+
byte[] pkBytes = primaryKeyEncoder.encodeKey(lookupKey);
98+
byte[] bkBytes =
99+
bucketKeyEncoder == primaryKeyEncoder
100+
? pkBytes
101+
: bucketKeyEncoder.encodeKey(lookupKey);
102+
103+
// If partition getter is present, we need to get partition ID asynchronously
104+
if (partitionGetter != null) {
105+
// Use async version to avoid blocking Netty IO threads
106+
return getPartitionIdAsync(
104107
lookupKey,
105108
partitionGetter,
106109
tableInfo.getTablePath(),
107-
metadataUpdater);
108-
} catch (PartitionNotExistException e) {
109-
return CompletableFuture.completedFuture(new LookupResult(Collections.emptyList()));
110+
metadataUpdater)
111+
.thenCompose(partitionId -> performLookup(partitionId, bkBytes, pkBytes))
112+
.exceptionally(
113+
throwable -> {
114+
// Handle partition not exist exception by returning null result
115+
if (ExceptionUtils.findThrowable(
116+
throwable, PartitionNotExistException.class)
117+
.isPresent()) {
118+
return new LookupResult((InternalRow) null);
119+
}
120+
// Re-throw other exceptions
121+
throw new RuntimeException(throwable);
122+
});
123+
} else {
124+
// No partition, directly perform lookup
125+
return performLookup(null, bkBytes, pkBytes);
110126
}
127+
} catch (Exception e) {
128+
return FutureUtils.failedCompletableFuture(e);
111129
}
130+
}
112131

132+
/**
133+
* Perform the actual lookup operation and process the result.
134+
*
135+
* @param partitionId the partition ID, or null if the table is not partitioned
136+
* @param bkBytes the encoded bucket key bytes
137+
* @param pkBytes the encoded primary key bytes
138+
* @return a CompletableFuture containing the lookup result
139+
*/
140+
private CompletableFuture<LookupResult> performLookup(
141+
@Nullable Long partitionId, byte[] bkBytes, byte[] pkBytes) {
113142
int bucketId = bucketingFunction.bucketing(bkBytes, numBuckets);
114143
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
115-
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
116-
lookupClient
144+
return lookupClient
117145
.lookup(tableInfo.getTablePath(), tableBucket, pkBytes)
118-
.whenComplete(
119-
(result, error) -> {
120-
if (error != null) {
121-
lookupFuture.completeExceptionally(error);
122-
} else {
123-
handleLookupResponse(
124-
result == null
125-
? Collections.emptyList()
126-
: Collections.singletonList(result),
127-
lookupFuture);
128-
}
146+
.thenCompose(
147+
result -> {
148+
CompletableFuture<LookupResult> resultFuture =
149+
new CompletableFuture<>();
150+
handleLookupResponse(
151+
result == null
152+
? Collections.emptyList()
153+
: Collections.singletonList(result),
154+
resultFuture);
155+
return resultFuture;
129156
});
130-
return lookupFuture;
131157
}
132158
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.client.metadata;
19+
20+
import org.apache.fluss.cluster.Cluster;
21+
import org.apache.fluss.metadata.PhysicalTablePath;
22+
import org.apache.fluss.metadata.TablePath;
23+
import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway;
24+
25+
import javax.annotation.Nullable;
26+
27+
import java.util.Collection;
28+
import java.util.Set;
29+
import java.util.concurrent.CompletableFuture;
30+
31+
/**
32+
* An abstraction for fetching metadata and rebuilding cluster. This interface allows dependency
33+
* injection for testing purposes.
34+
*/
35+
@FunctionalInterface
36+
public interface ClusterMetadataFetcher {
37+
38+
/**
39+
* Fetch metadata and rebuild cluster asynchronously.
40+
*
41+
* @param gateway the gateway to send request
42+
* @param partialUpdate whether to perform partial update (merge with existing cluster)
43+
* @param originCluster the original cluster to merge with (if partial update)
44+
* @param tablePaths tables to request metadata for
45+
* @param tablePartitions partitions to request metadata for
46+
* @param tablePartitionIds partition ids to request metadata for
47+
* @return a future that completes with the new cluster
48+
*/
49+
CompletableFuture<Cluster> fetch(
50+
AdminReadOnlyGateway gateway,
51+
boolean partialUpdate,
52+
Cluster originCluster,
53+
@Nullable Set<TablePath> tablePaths,
54+
@Nullable Collection<PhysicalTablePath> tablePartitions,
55+
@Nullable Collection<Long> tablePartitionIds);
56+
}

0 commit comments

Comments
 (0)