1919package org .apache .paimon .flink .lookup ;
2020
2121import org .apache .paimon .annotation .VisibleForTesting ;
22+ import org .apache .paimon .codegen .CodeGenUtils ;
23+ import org .apache .paimon .codegen .Projection ;
2224import org .apache .paimon .data .BinaryRow ;
2325import org .apache .paimon .data .InternalRow ;
2426import org .apache .paimon .disk .IOManagerImpl ;
2527import org .apache .paimon .flink .query .RemoteTableQuery ;
2628import org .apache .paimon .io .DataFileMeta ;
2729import org .apache .paimon .predicate .Predicate ;
30+ import org .apache .paimon .schema .TableSchema ;
2831import org .apache .paimon .table .BucketMode ;
2932import org .apache .paimon .table .FileStoreTable ;
3033import org .apache .paimon .table .query .LocalTableQuery ;
34+ import org .apache .paimon .table .sink .KeyAndBucketExtractor ;
3135import org .apache .paimon .table .source .DataSplit ;
3236import org .apache .paimon .table .source .Split ;
3337import org .apache .paimon .table .source .StreamTableScan ;
@@ -62,8 +66,8 @@ public class PrimaryKeyPartialLookupTable implements LookupTable {
6266 @ Nullable private Filter <InternalRow > cacheRowFilter ;
6367 private QueryExecutor queryExecutor ;
6468
65- private final PartitionFromPkExtractor partExtractor ;
66- private final FixedBucketFromPkExtractor bucketExtractor ;
69+ private final Projection partitionFromPk ;
70+ private final Projection bucketKeyFromPk ;
6771
6872 private PrimaryKeyPartialLookupTable (
6973 QueryExecutorFactory executorFactory , FileStoreTable table , List <String > joinKey ) {
@@ -73,8 +77,19 @@ private PrimaryKeyPartialLookupTable(
7377 "Unsupported mode for partial lookup: " + table .bucketMode ());
7478 }
7579
76- this .partExtractor = new PartitionFromPkExtractor (table .schema ());
77- this .bucketExtractor = new FixedBucketFromPkExtractor (table .schema ());
80+ TableSchema schema = table .schema ();
81+ this .partitionFromPk =
82+ CodeGenUtils .newProjection (
83+ schema .logicalPrimaryKeysType (),
84+ schema .partitionKeys ().stream ()
85+ .mapToInt (schema .primaryKeys ()::indexOf )
86+ .toArray ());
87+ this .bucketKeyFromPk =
88+ CodeGenUtils .newProjection (
89+ schema .logicalPrimaryKeysType (),
90+ schema .bucketKeys ().stream ()
91+ .mapToInt (schema .primaryKeys ()::indexOf )
92+ .toArray ());
7893
7994 ProjectedRow keyRearrange = null ;
8095 if (!table .primaryKeys ().equals (joinKey )) {
@@ -87,7 +102,7 @@ private PrimaryKeyPartialLookupTable(
87102 }
88103 this .keyRearrange = keyRearrange ;
89104
90- List <String > trimmedPrimaryKeys = table . schema () .trimmedPrimaryKeys ();
105+ List <String > trimmedPrimaryKeys = schema .trimmedPrimaryKeys ();
91106 ProjectedRow trimmedKeyRearrange = null ;
92107 if (!trimmedPrimaryKeys .equals (joinKey )) {
93108 trimmedKeyRearrange =
@@ -123,13 +138,13 @@ public List<InternalRow> get(InternalRow key) throws IOException {
123138 adjustedKey = keyRearrange .replaceRow (adjustedKey );
124139 }
125140
126- BinaryRow partition = partExtractor . partition (adjustedKey );
141+ BinaryRow partition = partitionFromPk . apply (adjustedKey );
127142 Integer numBuckets = queryExecutor .numBuckets (partition );
128143 if (numBuckets == null ) {
129144 // no data, just return none
130145 return Collections .emptyList ();
131146 }
132- int bucket = bucketExtractor . bucket (numBuckets , adjustedKey );
147+ int bucket = bucket (numBuckets , adjustedKey );
133148
134149 InternalRow trimmedKey = key ;
135150 if (trimmedKeyRearrange != null ) {
@@ -144,6 +159,12 @@ public List<InternalRow> get(InternalRow key) throws IOException {
144159 }
145160 }
146161
162+ private int bucket (int numBuckets , InternalRow primaryKey ) {
163+ BinaryRow bucketKey = bucketKeyFromPk .apply (primaryKey );
164+ return KeyAndBucketExtractor .bucket (
165+ KeyAndBucketExtractor .bucketKeyHashCode (bucketKey ), numBuckets );
166+ }
167+
147168 @ Override
148169 public void refresh () {
149170 queryExecutor .refresh ();
0 commit comments