Skip to content

Commit 88c0286

Browse files
committed
[flink] Fix Flink Lookup Join for Postpone bucket table
1 parent 8d65ce3 commit 88c0286

File tree

6 files changed

+229
-64
lines changed

6 files changed

+229
-64
lines changed

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ private boolean buildForPostponeBucketCompaction(
259259
InternalRowPartitionComputer partitionComputer =
260260
new InternalRowPartitionComputer(
261261
fileStoreTable.coreOptions().partitionDefaultName(),
262-
fileStoreTable.rowType(),
262+
fileStoreTable.store().partitionType(),
263263
fileStoreTable.partitionKeys().toArray(new String[0]),
264264
fileStoreTable.coreOptions().legacyPartitionName());
265265
String commitUser = CoreOptions.createCommitUser(options);

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java

Lines changed: 6 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,36 +18,23 @@
1818

1919
package org.apache.paimon.flink.lookup;
2020

21-
import org.apache.paimon.CoreOptions;
2221
import org.apache.paimon.codegen.CodeGenUtils;
2322
import org.apache.paimon.codegen.Projection;
2423
import org.apache.paimon.data.BinaryRow;
2524
import org.apache.paimon.data.InternalRow;
2625
import org.apache.paimon.schema.TableSchema;
2726
import org.apache.paimon.table.sink.KeyAndBucketExtractor;
2827

29-
import static org.apache.paimon.utils.Preconditions.checkArgument;
30-
3128
/** Extractor to extract bucket from the primary key. */
32-
public class FixedBucketFromPkExtractor implements KeyAndBucketExtractor<InternalRow> {
33-
34-
private transient InternalRow primaryKey;
29+
public class FixedBucketFromPkExtractor {
3530

3631
private final boolean sameBucketKeyAndTrimmedPrimaryKey;
3732

38-
private final int numBuckets;
39-
4033
private final Projection bucketKeyProjection;
4134

4235
private final Projection trimmedPrimaryKeyProjection;
4336

44-
private final Projection partitionProjection;
45-
46-
private final Projection logPrimaryKeyProjection;
47-
4837
public FixedBucketFromPkExtractor(TableSchema schema) {
49-
this.numBuckets = new CoreOptions(schema.options()).bucket();
50-
checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
5138
this.sameBucketKeyAndTrimmedPrimaryKey =
5239
schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
5340
this.bucketKeyProjection =
@@ -62,49 +49,16 @@ public FixedBucketFromPkExtractor(TableSchema schema) {
6249
schema.trimmedPrimaryKeys().stream()
6350
.mapToInt(schema.primaryKeys()::indexOf)
6451
.toArray());
65-
this.partitionProjection =
66-
CodeGenUtils.newProjection(
67-
schema.logicalPrimaryKeysType(),
68-
schema.partitionKeys().stream()
69-
.mapToInt(schema.primaryKeys()::indexOf)
70-
.toArray());
71-
this.logPrimaryKeyProjection =
72-
CodeGenUtils.newProjection(
73-
schema.logicalRowType(), schema.projection(schema.primaryKeys()));
7452
}
7553

76-
@Override
77-
public void setRecord(InternalRow record) {
78-
this.primaryKey = record;
79-
}
80-
81-
@Override
82-
public BinaryRow partition() {
83-
return partitionProjection.apply(primaryKey);
84-
}
85-
86-
private BinaryRow bucketKey() {
54+
public int bucket(int numBuckets, InternalRow primaryKey) {
55+
BinaryRow bucketKey;
8756
if (sameBucketKeyAndTrimmedPrimaryKey) {
88-
return trimmedPrimaryKey();
57+
bucketKey = trimmedPrimaryKeyProjection.apply(primaryKey);
58+
} else {
59+
bucketKey = bucketKeyProjection.apply(primaryKey);
8960
}
90-
91-
return bucketKeyProjection.apply(primaryKey);
92-
}
93-
94-
@Override
95-
public int bucket() {
96-
BinaryRow bucketKey = bucketKey();
9761
return KeyAndBucketExtractor.bucket(
9862
KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), numBuckets);
9963
}
100-
101-
@Override
102-
public BinaryRow trimmedPrimaryKey() {
103-
return trimmedPrimaryKeyProjection.apply(primaryKey);
104-
}
105-
106-
@Override
107-
public BinaryRow logPrimaryKey() {
108-
return logPrimaryKeyProjection.apply(primaryKey);
109-
}
11064
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.lookup;
20+
21+
import org.apache.paimon.codegen.CodeGenUtils;
22+
import org.apache.paimon.codegen.Projection;
23+
import org.apache.paimon.data.BinaryRow;
24+
import org.apache.paimon.data.InternalRow;
25+
import org.apache.paimon.schema.TableSchema;
26+
27+
/** Extractor to extract partition from the primary key. */
28+
public class PartitionFromPkExtractor {
29+
30+
private final Projection partitionProjection;
31+
32+
public PartitionFromPkExtractor(TableSchema schema) {
33+
this.partitionProjection =
34+
CodeGenUtils.newProjection(
35+
schema.logicalPrimaryKeysType(),
36+
schema.partitionKeys().stream()
37+
.mapToInt(schema.primaryKeys()::indexOf)
38+
.toArray());
39+
}
40+
41+
public BinaryRow partition(InternalRow primaryKey) {
42+
return partitionProjection.apply(primaryKey);
43+
}
44+
}

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,31 +43,37 @@
4343
import java.io.File;
4444
import java.io.IOException;
4545
import java.util.Collections;
46+
import java.util.HashMap;
4647
import java.util.List;
48+
import java.util.Map;
4749
import java.util.Set;
4850

51+
import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET;
52+
4953
/** Lookup table for primary key which supports to read the LSM tree directly. */
5054
public class PrimaryKeyPartialLookupTable implements LookupTable {
5155

5256
private final QueryExecutorFactory executorFactory;
53-
private final FixedBucketFromPkExtractor extractor;
5457
@Nullable private final ProjectedRow keyRearrange;
5558
@Nullable private final ProjectedRow trimmedKeyRearrange;
5659

5760
private Predicate specificPartition;
5861
@Nullable private Filter<InternalRow> cacheRowFilter;
5962
private QueryExecutor queryExecutor;
6063

64+
private final PartitionFromPkExtractor partExtractor;
65+
private final FixedBucketFromPkExtractor bucketExtractor;
66+
6167
private PrimaryKeyPartialLookupTable(
6268
QueryExecutorFactory executorFactory, FileStoreTable table, List<String> joinKey) {
6369
this.executorFactory = executorFactory;
64-
6570
if (table.bucketMode() != BucketMode.HASH_FIXED) {
6671
throw new UnsupportedOperationException(
6772
"Unsupported mode for partial lookup: " + table.bucketMode());
6873
}
6974

70-
this.extractor = new FixedBucketFromPkExtractor(table.schema());
75+
this.partExtractor = new PartitionFromPkExtractor(table.schema());
76+
this.bucketExtractor = new FixedBucketFromPkExtractor(table.schema());
7177

7278
ProjectedRow keyRearrange = null;
7379
if (!table.primaryKeys().equals(joinKey)) {
@@ -115,9 +121,14 @@ public List<InternalRow> get(InternalRow key) throws IOException {
115121
if (keyRearrange != null) {
116122
adjustedKey = keyRearrange.replaceRow(adjustedKey);
117123
}
118-
extractor.setRecord(adjustedKey);
119-
int bucket = extractor.bucket();
120-
BinaryRow partition = extractor.partition();
124+
125+
BinaryRow partition = partExtractor.partition(adjustedKey);
126+
Integer numBuckets = queryExecutor.numBuckets(partition);
127+
if (numBuckets == null) {
128+
// no data, just return none
129+
return Collections.emptyList();
130+
}
131+
int bucket = bucketExtractor.bucket(numBuckets, adjustedKey);
121132

122133
InternalRow trimmedKey = key;
123134
if (trimmedKeyRearrange != null) {
@@ -182,6 +193,9 @@ interface QueryExecutorFactory {
182193

183194
interface QueryExecutor extends Closeable {
184195

196+
@Nullable
197+
Integer numBuckets(BinaryRow partition);
198+
185199
InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) throws IOException;
186200

187201
void refresh();
@@ -195,6 +209,8 @@ static class LocalQueryExecutor implements QueryExecutor {
195209
private final StreamTableScan scan;
196210
private final String tableName;
197211

212+
private final Map<BinaryRow, Integer> totalBuckets;
213+
198214
private LocalQueryExecutor(
199215
FileStoreTable table,
200216
int[] projection,
@@ -222,6 +238,13 @@ private LocalQueryExecutor(
222238
.newStreamScan();
223239

224240
this.tableName = table.name();
241+
this.totalBuckets = new HashMap<>();
242+
}
243+
244+
@Override
245+
@Nullable
246+
public Integer numBuckets(BinaryRow partition) {
247+
return totalBuckets.get(partition);
225248
}
226249

227250
@Override
@@ -241,12 +264,14 @@ public void refresh() {
241264
}
242265

243266
for (Split split : splits) {
244-
BinaryRow partition = ((DataSplit) split).partition();
245-
int bucket = ((DataSplit) split).bucket();
246-
List<DataFileMeta> before = ((DataSplit) split).beforeFiles();
247-
List<DataFileMeta> after = ((DataSplit) split).dataFiles();
267+
DataSplit dataSplit = (DataSplit) split;
268+
BinaryRow partition = dataSplit.partition();
269+
int bucket = dataSplit.bucket();
270+
List<DataFileMeta> before = dataSplit.beforeFiles();
271+
List<DataFileMeta> after = dataSplit.dataFiles();
248272

249273
tableQuery.refreshFiles(partition, bucket, before, after);
274+
totalBuckets.put(partition, dataSplit.totalBuckets());
250275
}
251276
}
252277
}
@@ -273,9 +298,22 @@ private void log(List<Split> splits) {
273298
static class RemoteQueryExecutor implements QueryExecutor {
274299

275300
private final RemoteTableQuery tableQuery;
301+
private final Integer numBuckets;
276302

277303
private RemoteQueryExecutor(FileStoreTable table, int[] projection) {
278304
this.tableQuery = new RemoteTableQuery(table).withValueProjection(projection);
305+
int numBuckets = table.bucketSpec().getNumBuckets();
306+
if (numBuckets == POSTPONE_BUCKET) {
307+
throw new UnsupportedOperationException(
308+
"Remote query does not support POSTPONE_BUCKET.");
309+
}
310+
this.numBuckets = numBuckets;
311+
}
312+
313+
@Override
314+
@Nullable
315+
public Integer numBuckets(BinaryRow partition) {
316+
return numBuckets;
279317
}
280318

281319
@Override

paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.paimon.predicate.Predicate;
3939
import org.apache.paimon.schema.TableSchema;
4040
import org.apache.paimon.table.BucketMode;
41+
import org.apache.paimon.table.BucketSpec;
4142
import org.apache.paimon.table.DataTable;
4243
import org.apache.paimon.table.FileStoreTable;
4344
import org.apache.paimon.table.Table;
@@ -86,6 +87,7 @@
8687
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
8788
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
8889
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
90+
import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET;
8991
import static org.apache.paimon.utils.Preconditions.checkNotNull;
9092

9193
/**
@@ -409,7 +411,9 @@ public Optional<InputDataPartitioner> getPartitioner() {
409411

410412
private boolean supportBucketShufflePartitioner(
411413
List<String> joinKeyFieldNames, List<String> bucketKeyFieldNames) {
412-
return BucketMode.HASH_FIXED.equals(((FileStoreTable) table).bucketMode())
414+
BucketSpec bucketSpec = ((FileStoreTable) table).bucketSpec();
415+
return bucketSpec.getBucketMode() == BucketMode.HASH_FIXED
416+
&& bucketSpec.getNumBuckets() != POSTPONE_BUCKET
413417
&& new HashSet<>(joinKeyFieldNames).containsAll(bucketKeyFieldNames);
414418
}
415419
}

0 commit comments

Comments
 (0)