Skip to content

Commit 9c412c9

Browse files
authored
[client] LookupSender get table info from LookupQuery (#2093)
1 parent 08efef4 commit 9c412c9

File tree

14 files changed

+71
-43
lines changed

14 files changed

+71
-43
lines changed

fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,12 @@ private ListOffsetsResult listOffsets(
419419
}
420420
Map<Integer, ListOffsetsRequest> requestMap =
421421
prepareListOffsetsRequests(
422-
metadataUpdater, tableInfo.getTableId(), partitionId, buckets, offsetSpec);
422+
metadataUpdater,
423+
tableInfo.getTableId(),
424+
partitionId,
425+
buckets,
426+
offsetSpec,
427+
tableInfo.getTablePath());
423428
Map<Integer, CompletableFuture<Long>> bucketToOffsetMap = MapUtils.newConcurrentHashMap();
424429
for (int bucket : buckets) {
425430
bucketToOffsetMap.put(bucket, new CompletableFuture<>());
@@ -536,10 +541,13 @@ private static Map<Integer, ListOffsetsRequest> prepareListOffsetsRequests(
536541
long tableId,
537542
@Nullable Long partitionId,
538543
Collection<Integer> buckets,
539-
OffsetSpec offsetSpec) {
544+
OffsetSpec offsetSpec,
545+
TablePath tablePath) {
540546
Map<Integer, List<Integer>> nodeForBucketList = new HashMap<>();
541547
for (Integer bucketId : buckets) {
542-
int leader = metadataUpdater.leaderFor(new TableBucket(tableId, partitionId, bucketId));
548+
int leader =
549+
metadataUpdater.leaderFor(
550+
tablePath, new TableBucket(tableId, partitionId, bucketId));
543551
nodeForBucketList.computeIfAbsent(leader, k -> new ArrayList<>()).add(bucketId);
544552
}
545553

fluss-client/src/main/java/org/apache/fluss/client/lookup/AbstractLookupQuery.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,21 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.metadata.TablePath;
2223

2324
import java.util.concurrent.CompletableFuture;
2425

2526
/** Abstract Class to represent a lookup operation. */
2627
@Internal
2728
public abstract class AbstractLookupQuery<T> {
2829

30+
private final TablePath tablePath;
2931
private final TableBucket tableBucket;
3032
private final byte[] key;
3133
private int retries;
3234

33-
public AbstractLookupQuery(TableBucket tableBucket, byte[] key) {
35+
public AbstractLookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] key) {
36+
this.tablePath = tablePath;
3437
this.tableBucket = tableBucket;
3538
this.key = key;
3639
this.retries = 0;
@@ -40,6 +43,10 @@ public byte[] key() {
4043
return key;
4144
}
4245

46+
public TablePath tablePath() {
47+
return tablePath;
48+
}
49+
4350
public TableBucket tableBucket() {
4451
return tableBucket;
4552
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupClient.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.config.ConfigOptions;
2323
import org.apache.fluss.config.Configuration;
2424
import org.apache.fluss.metadata.TableBucket;
25+
import org.apache.fluss.metadata.TablePath;
2526
import org.apache.fluss.utils.concurrent.ExecutorThreadFactory;
2627

2728
import org.slf4j.Logger;
@@ -43,9 +44,9 @@
4344
* that is responsible for turning these lookup operations into network requests and transmitting
4445
* them to the cluster.
4546
*
46-
* <p>The {@link #lookup(TableBucket, byte[])} method is asynchronous, when called, it adds the
47-
* lookup operation to a queue of pending lookup operations and immediately returns. This allows the
48-
* lookup operations to batch together individual lookup operations for efficiency.
47+
* <p>The {@link #lookup(TablePath, TableBucket, byte[])} method is asynchronous, when called, it
48+
* adds the lookup operation to a queue of pending lookup operations and immediately returns. This
49+
* allows the lookup operations to batch together individual lookup operations for efficiency.
4950
*/
5051
@ThreadSafe
5152
@Internal
@@ -78,14 +79,16 @@ private ExecutorService createThreadPool() {
7879
return Executors.newFixedThreadPool(1, new ExecutorThreadFactory(LOOKUP_THREAD_PREFIX));
7980
}
8081

81-
public CompletableFuture<byte[]> lookup(TableBucket tableBucket, byte[] keyBytes) {
82-
LookupQuery lookup = new LookupQuery(tableBucket, keyBytes);
82+
public CompletableFuture<byte[]> lookup(
83+
TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) {
84+
LookupQuery lookup = new LookupQuery(tablePath, tableBucket, keyBytes);
8385
lookupQueue.appendLookup(lookup);
8486
return lookup.future();
8587
}
8688

87-
public CompletableFuture<List<byte[]>> prefixLookup(TableBucket tableBucket, byte[] keyBytes) {
88-
PrefixLookupQuery prefixLookup = new PrefixLookupQuery(tableBucket, keyBytes);
89+
public CompletableFuture<List<byte[]>> prefixLookup(
90+
TablePath tablePath, TableBucket tableBucket, byte[] keyBytes) {
91+
PrefixLookupQuery prefixLookup = new PrefixLookupQuery(tablePath, tableBucket, keyBytes);
8992
lookupQueue.appendLookup(prefixLookup);
9093
return prefixLookup.future();
9194
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupQuery.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.metadata.TablePath;
2223

2324
import java.util.concurrent.CompletableFuture;
2425

@@ -31,8 +32,8 @@ public class LookupQuery extends AbstractLookupQuery<byte[]> {
3132

3233
private final CompletableFuture<byte[]> future;
3334

34-
LookupQuery(TableBucket tableBucket, byte[] key) {
35-
super(tableBucket, key);
35+
LookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] key) {
36+
super(tablePath, tableBucket, key);
3637
this.future = new CompletableFuture<>();
3738
}
3839

fluss-client/src/main/java/org/apache/fluss/client/lookup/LookupSender.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,10 @@ private Map<Tuple2<Integer, LookupType>, List<AbstractLookupQuery<?>>> groupByLe
157157
// lookup the leader node
158158
TableBucket tb = lookup.tableBucket();
159159
try {
160-
leader = metadataUpdater.leaderFor(tb);
160+
leader = metadataUpdater.leaderFor(lookup.tablePath(), tb);
161161
} catch (Exception e) {
162162
// if leader is not found, re-enqueue the lookup to send again.
163+
LOG.warn("Failed to lookup the leader for {} when lookup", tb, e);
163164
reEnqueueLookup(lookup);
164165
continue;
165166
}

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixKeyLookuper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow prefixKey) {
151151
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
152152
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
153153
lookupClient
154-
.prefixLookup(tableBucket, bucketKeyBytes)
154+
.prefixLookup(tableInfo.getTablePath(), tableBucket, bucketKeyBytes)
155155
.whenComplete(
156156
(result, error) -> {
157157
if (error != null) {

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrefixLookupQuery.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.annotation.Internal;
2121
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.metadata.TablePath;
2223

2324
import java.util.List;
2425
import java.util.concurrent.CompletableFuture;
@@ -31,8 +32,8 @@
3132
public class PrefixLookupQuery extends AbstractLookupQuery<List<byte[]>> {
3233
private final CompletableFuture<List<byte[]>> future;
3334

34-
PrefixLookupQuery(TableBucket tableBucket, byte[] prefixKey) {
35-
super(tableBucket, prefixKey);
35+
PrefixLookupQuery(TablePath tablePath, TableBucket tableBucket, byte[] prefixKey) {
36+
super(tablePath, tableBucket, prefixKey);
3637
this.future = new CompletableFuture<>();
3738
}
3839

fluss-client/src/main/java/org/apache/fluss/client/lookup/PrimaryKeyLookuper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
114114
TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId);
115115
CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
116116
lookupClient
117-
.lookup(tableBucket, pkBytes)
117+
.lookup(tableInfo.getTablePath(), tableBucket, pkBytes)
118118
.whenComplete(
119119
(result, error) -> {
120120
if (error != null) {

fluss-client/src/main/java/org/apache/fluss/client/metadata/MetadataUpdater.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,10 @@ public Optional<BucketLocation> getBucketLocation(TableBucket tableBucket) {
9898
return cluster.getBucketLocation(tableBucket);
9999
}
100100

101-
public int leaderFor(TableBucket tableBucket) {
101+
public int leaderFor(TablePath tablePath, TableBucket tableBucket) {
102102
Integer serverNode = cluster.leaderFor(tableBucket);
103103
if (serverNode == null) {
104104
for (int i = 0; i < MAX_RETRY_TIMES; i++) {
105-
TablePath tablePath = cluster.getTablePathOrElseThrow(tableBucket.getTableId());
106105
// check if bucket is for a partition
107106
if (tableBucket.getPartitionId() != null) {
108107
updateMetadata(

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public LimitBatchScanner(
105105
}
106106

107107
// because that rocksdb is not suitable to projection, thus do it in client.
108-
int leader = metadataUpdater.leaderFor(tableBucket);
108+
int leader = metadataUpdater.leaderFor(tableInfo.getTablePath(), tableBucket);
109109
TabletServerGateway gateway = metadataUpdater.newTabletServerClientForNode(leader);
110110
if (gateway == null) {
111111
// TODO handle this exception, like retry.

0 commit comments

Comments
 (0)