|
28 | 28 | import org.apache.fluss.row.InternalRow; |
29 | 29 | import org.apache.fluss.row.encode.KeyEncoder; |
30 | 30 | import org.apache.fluss.types.RowType; |
| 31 | +import org.apache.fluss.utils.ExceptionUtils; |
| 32 | +import org.apache.fluss.utils.concurrent.FutureUtils; |
31 | 33 |
|
32 | 34 | import javax.annotation.Nullable; |
33 | 35 | import javax.annotation.concurrent.NotThreadSafe; |
34 | 36 |
|
35 | 37 | import java.util.Collections; |
36 | 38 | import java.util.concurrent.CompletableFuture; |
37 | 39 |
|
38 | | -import static org.apache.fluss.client.utils.ClientUtils.getPartitionId; |
| 40 | +import static org.apache.fluss.client.utils.ClientUtils.getPartitionIdAsync; |
39 | 41 | import static org.apache.fluss.utils.Preconditions.checkArgument; |
40 | 42 |
|
41 | 43 | /** An implementation of {@link Lookuper} that lookups by primary key. */ |
@@ -89,44 +91,68 @@ public PrimaryKeyLookuper( |
89 | 91 |
|
90 | 92 | @Override |
91 | 93 | public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) { |
92 | | - // encoding the key row using a compacted way consisted with how the key is encoded when put |
93 | | - // a row |
94 | | - byte[] pkBytes = primaryKeyEncoder.encodeKey(lookupKey); |
95 | | - byte[] bkBytes = |
96 | | - bucketKeyEncoder == primaryKeyEncoder |
97 | | - ? pkBytes |
98 | | - : bucketKeyEncoder.encodeKey(lookupKey); |
99 | | - Long partitionId = null; |
100 | | - if (partitionGetter != null) { |
101 | | - try { |
102 | | - partitionId = |
103 | | - getPartitionId( |
| 94 | + try { |
| 95 | + // encoding the key row using a compacted way consisted with how the key is encoded when |
| 96 | + // put a row |
| 97 | + byte[] pkBytes = primaryKeyEncoder.encodeKey(lookupKey); |
| 98 | + byte[] bkBytes = |
| 99 | + bucketKeyEncoder == primaryKeyEncoder |
| 100 | + ? pkBytes |
| 101 | + : bucketKeyEncoder.encodeKey(lookupKey); |
| 102 | + |
| 103 | + // If partition getter is present, we need to get partition ID asynchronously |
| 104 | + if (partitionGetter != null) { |
| 105 | + // Use async version to avoid blocking Netty IO threads |
| 106 | + return getPartitionIdAsync( |
104 | 107 | lookupKey, |
105 | 108 | partitionGetter, |
106 | 109 | tableInfo.getTablePath(), |
107 | | - metadataUpdater); |
108 | | - } catch (PartitionNotExistException e) { |
109 | | - return CompletableFuture.completedFuture(new LookupResult(Collections.emptyList())); |
| 110 | + metadataUpdater) |
| 111 | + .thenCompose(partitionId -> performLookup(partitionId, bkBytes, pkBytes)) |
| 112 | + .exceptionally( |
| 113 | + throwable -> { |
| 114 | + // Handle partition not exist exception by returning null result |
| 115 | + if (ExceptionUtils.findThrowable( |
| 116 | + throwable, PartitionNotExistException.class) |
| 117 | + .isPresent()) { |
| 118 | + return new LookupResult((InternalRow) null); |
| 119 | + } |
| 120 | + // Re-throw other exceptions |
| 121 | + throw new RuntimeException(throwable); |
| 122 | + }); |
| 123 | + } else { |
| 124 | + // No partition, directly perform lookup |
| 125 | + return performLookup(null, bkBytes, pkBytes); |
110 | 126 | } |
| 127 | + } catch (Exception e) { |
| 128 | + return FutureUtils.failedCompletableFuture(e); |
111 | 129 | } |
| 130 | + } |
112 | 131 |
|
| 132 | + /** |
| 133 | + * Perform the actual lookup operation and process the result. |
| 134 | + * |
| 135 | + * @param partitionId the partition ID, or null if the table is not partitioned |
| 136 | + * @param bkBytes the encoded bucket key bytes |
| 137 | + * @param pkBytes the encoded primary key bytes |
| 138 | + * @return a CompletableFuture containing the lookup result |
| 139 | + */ |
| 140 | + private CompletableFuture<LookupResult> performLookup( |
| 141 | + @Nullable Long partitionId, byte[] bkBytes, byte[] pkBytes) { |
113 | 142 | int bucketId = bucketingFunction.bucketing(bkBytes, numBuckets); |
114 | 143 | TableBucket tableBucket = new TableBucket(tableInfo.getTableId(), partitionId, bucketId); |
115 | | - CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>(); |
116 | | - lookupClient |
| 144 | + return lookupClient |
117 | 145 | .lookup(tableInfo.getTablePath(), tableBucket, pkBytes) |
118 | | - .whenComplete( |
119 | | - (result, error) -> { |
120 | | - if (error != null) { |
121 | | - lookupFuture.completeExceptionally(error); |
122 | | - } else { |
123 | | - handleLookupResponse( |
124 | | - result == null |
125 | | - ? Collections.emptyList() |
126 | | - : Collections.singletonList(result), |
127 | | - lookupFuture); |
128 | | - } |
| 146 | + .thenCompose( |
| 147 | + result -> { |
| 148 | + CompletableFuture<LookupResult> resultFuture = |
| 149 | + new CompletableFuture<>(); |
| 150 | + handleLookupResponse( |
| 151 | + result == null |
| 152 | + ? Collections.emptyList() |
| 153 | + : Collections.singletonList(result), |
| 154 | + resultFuture); |
| 155 | + return resultFuture; |
129 | 156 | }); |
130 | | - return lookupFuture; |
131 | 157 | } |
132 | 158 | } |
0 commit comments