2020import org .apache .fluss .bucketing .BucketingFunction ;
2121import org .apache .fluss .client .metadata .MetadataUpdater ;
2222import org .apache .fluss .client .table .getter .PartitionGetter ;
23- import org .apache .fluss .exception .LeaderNotAvailableException ;
2423import org .apache .fluss .exception .PartitionNotExistException ;
25- import org .apache .fluss .exception .TableNotPartitionedException ;
2624import org .apache .fluss .metadata .DataLakeFormat ;
27- import org .apache .fluss .metadata .PhysicalTablePath ;
2825import org .apache .fluss .metadata .TableBucket ;
2926import org .apache .fluss .metadata .TableInfo ;
30- import org .apache .fluss .metadata .TablePath ;
31- import org .apache .fluss .record .DefaultValueRecordBatch ;
32- import org .apache .fluss .record .ValueRecord ;
33- import org .apache .fluss .record .ValueRecordReadContext ;
3427import org .apache .fluss .row .InternalRow ;
3528import org .apache .fluss .row .decode .RowDecoder ;
3629import org .apache .fluss .row .encode .KeyEncoder ;
3730import org .apache .fluss .row .encode .ValueDecoder ;
38- import org .apache .fluss .rpc .gateway .TabletServerGateway ;
39- import org .apache .fluss .rpc .messages .FullScanRequest ;
40- import org .apache .fluss .rpc .messages .FullScanResponse ;
41- import org .apache .fluss .rpc .protocol .Errors ;
4231import org .apache .fluss .types .DataType ;
4332import org .apache .fluss .types .RowType ;
4433
4534import javax .annotation .Nullable ;
4635
47- import java .nio .ByteBuffer ;
48- import java .util .ArrayList ;
4936import java .util .Collections ;
50- import java .util .HashSet ;
51- import java .util .List ;
52- import java .util .Optional ;
5337import java .util .concurrent .CompletableFuture ;
5438
5539import static org .apache .fluss .client .utils .ClientUtils .getPartitionId ;
5640import static org .apache .fluss .utils .Preconditions .checkArgument ;
5741
58- /**
59- * Client-side lookuper implementation for primary-key tables.
60- *
61- * <p>This class supports single-key lookups as well as bounded, point-in-time snapshot reads that
62- * return all current values of a KV table (and per-partition for partitioned tables). Snapshot
63- * reads are executed via the FULL_SCAN RPC and aggregate results across the leaders of all buckets
64- * on each target server.
65- */
42+ /** An implementation of {@link Lookuper} that lookups by primary key. */
6643class PrimaryKeyLookuper implements Lookuper {
6744
6845 private final TableInfo tableInfo ;
@@ -88,15 +65,6 @@ class PrimaryKeyLookuper implements Lookuper {
8865 /** Decode the lookup bytes to result row. */
8966 private final ValueDecoder kvValueDecoder ;
9067
91- /**
92- * Creates a primary-key lookuper for the given table.
93- *
94- * @param tableInfo resolved table metadata; must represent a table with a primary key
95- * @param metadataUpdater client-side metadata cache/updater used to resolve leaders and
96- * partitions
97- * @param lookupClient shared client used for key-based lookups and background I/O
98- * @throws IllegalArgumentException if the table is not a primary-key table
99- */
10068 public PrimaryKeyLookuper (
10169 TableInfo tableInfo , MetadataUpdater metadataUpdater , LookupClient lookupClient ) {
10270 checkArgument (
@@ -132,16 +100,6 @@ public PrimaryKeyLookuper(
132100 tableInfo .getRowType ().getChildren ().toArray (new DataType [0 ])));
133101 }
134102
135- /**
136- * Lookup a single row by its primary key.
137- *
138- * <p>The provided {@code lookupKey} must contain the primary-key fields in the expected order
139- * for this table. If the table is partitioned, the partition may be derived from the key (if
140- * configured) and routed accordingly.
141- *
142- * @param lookupKey the key row
143- * @return a future with the lookup result; the row may be null if not found
144- */
145103 @ Override
146104 public CompletableFuture <LookupResult > lookup (InternalRow lookupKey ) {
147105 // encoding the key row using a compacted way consisted with how the key is encoded when put
@@ -178,134 +136,4 @@ public CompletableFuture<LookupResult> lookup(InternalRow lookupKey) {
178136 return new LookupResult (row );
179137 });
180138 }
181-
182- /**
183- * Returns all current values across all buckets for a non-partitioned primary-key table using a
184- * point-in-time snapshot.
185- *
186- * @return a future with an unordered list of current rows
187- */
188- @ Override
189- public CompletableFuture <List <InternalRow >> snapshotAll () {
190- if (tableInfo .isPartitioned ()) {
191- CompletableFuture <List <InternalRow >> result = new CompletableFuture <>();
192- result .completeExceptionally (
193- new TableNotPartitionedException (
194- "Table is partitioned. Please use snapshotAllPartition(partitionName)." ));
195-
196- return result ;
197- }
198-
199- return executeFullScan (Optional .empty ());
200- }
201-
202- /**
203- * Returns all current values for the specified partition of a partitioned primary-key table
204- * using a point-in-time snapshot.
205- *
206- * @param partitionName the partition identifier (for example, a date string)
207- * @return a future with an unordered list of current rows in the partition
208- */
209- @ Override
210- public CompletableFuture <List <InternalRow >> snapshotAllPartition (String partitionName ) {
211- if (!tableInfo .isPartitioned ()) {
212- CompletableFuture <List <InternalRow >> result = new CompletableFuture <>();
213- result .completeExceptionally (
214- new TableNotPartitionedException (
215- "Table is not partitioned. Please use snapshotAll()." ));
216- return result ;
217- }
218-
219- // Resolve partition id from name
220- TablePath tablePath = tableInfo .getTablePath ();
221- PhysicalTablePath physicalTablePath = PhysicalTablePath .of (tablePath , partitionName );
222-
223- try {
224- metadataUpdater .checkAndUpdatePartitionMetadata (physicalTablePath );
225- } catch (PartitionNotExistException e ) {
226- CompletableFuture <List <InternalRow >> result = new CompletableFuture <>();
227- result .completeExceptionally (e );
228- return result ;
229- }
230-
231- Optional <Long > partitionId = metadataUpdater .getPartitionId (physicalTablePath );
232- return executeFullScan (partitionId );
233- }
234-
235- /**
236- * Decodes and aggregates FULL_SCAN RPC responses into a list of rows.
237- *
238- * <p>Throws a mapped client exception if any response contains an error.
239- *
240- * @param responseFutures futures of server responses per target leader node
241- * @return aggregated, unordered rows
242- */
243- private List <InternalRow > decodeFullScanResponses (
244- List <CompletableFuture <FullScanResponse >> responseFutures ) {
245- List <InternalRow > out = new ArrayList <>();
246- for (CompletableFuture <FullScanResponse > responseFuture : responseFutures ) {
247- FullScanResponse response = responseFuture .join ();
248-
249- if (response .hasErrorCode () && response .getErrorCode () != Errors .NONE .code ()) {
250- Errors err = Errors .forCode (response .getErrorCode ());
251- throw err .exception (
252- response .hasErrorMessage () ? response .getErrorMessage () : err .message ());
253- }
254-
255- if (response .hasRecords ()) {
256- ByteBuffer buffer = ByteBuffer .wrap (response .getRecords ());
257- DefaultValueRecordBatch values = DefaultValueRecordBatch .pointToByteBuffer (buffer );
258-
259- ValueRecordReadContext context =
260- new ValueRecordReadContext (kvValueDecoder .getRowDecoder ());
261- for (ValueRecord record : values .records (context )) {
262- out .add (record .getRow ());
263- }
264- }
265- }
266- return out ;
267- }
268-
269- private CompletableFuture <List <InternalRow >> executeFullScan (Optional <Long > partitionIdOpt ) {
270- Long partitionId = partitionIdOpt .orElse (null );
271- long tableId = tableInfo .getTableId ();
272- int numBuckets = tableInfo .getNumBuckets ();
273-
274- // Find leader tablet/servers for this table/partition
275- HashSet <Integer > leaderServers = new HashSet <>();
276- for (int bucketId = 0 ; bucketId < numBuckets ; bucketId ++) {
277- TableBucket tableBucket = new TableBucket (tableId , partitionId , bucketId );
278-
279- // ensure metadata for this bucket is present
280- metadataUpdater .checkAndUpdateMetadata (tableInfo .getTablePath (), tableBucket );
281- int leader = metadataUpdater .leaderFor (tableBucket );
282- leaderServers .add (leader );
283- }
284-
285- List <CompletableFuture <FullScanResponse >> responseFutures = new ArrayList <>();
286- for (int leader : leaderServers ) {
287- TabletServerGateway gateway = metadataUpdater .newTabletServerClientForNode (leader );
288- if (gateway == null ) {
289- CompletableFuture <List <InternalRow >> result = new CompletableFuture <>();
290- result .completeExceptionally (
291- new LeaderNotAvailableException (
292- "Leader server " + leader + " is not found in metadata cache." ));
293- return result ;
294- }
295-
296- FullScanRequest request = new FullScanRequest ();
297- request .setTableId (tableId );
298- // bucket_id is required by the protocol, even though servers currently
299- // ignore it for FULL_SCAN. Set a default bucket id to satisfy encoding.
300- request .setBucketId (0 );
301-
302- if (partitionId != null ) {
303- request .setPartitionId (partitionId );
304- }
305- responseFutures .add (gateway .fullScan (request ));
306- }
307- // Decode all responses and aggregate rows
308- return CompletableFuture .allOf (responseFutures .toArray (new CompletableFuture [0 ]))
309- .thenApply (v -> decodeFullScanResponses (responseFutures ));
310- }
311139}
0 commit comments