Skip to content

Commit 774051b

Browse files
committed
[flink] Improve the implementation for general predicates partition pushdown and fix bugs
1 parent 048dff7 commit 774051b

File tree

16 files changed

+228
-422
lines changed

16 files changed

+228
-422
lines changed

fluss-common/src/main/java/org/apache/fluss/predicate/Contains.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.predicate;
1919

20+
import org.apache.fluss.row.BinaryString;
2021
import org.apache.fluss.types.DataType;
2122

2223
import java.util.List;
@@ -37,8 +38,7 @@ private Contains() {}
3738

3839
@Override
3940
public boolean test(DataType type, Object field, Object patternLiteral) {
40-
String fieldString = field.toString();
41-
return fieldString.contains((String) patternLiteral);
41+
return ((BinaryString) field).contains((BinaryString) patternLiteral);
4242
}
4343

4444
@Override

fluss-common/src/main/java/org/apache/fluss/predicate/EndsWith.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.predicate;
1919

20+
import org.apache.fluss.row.BinaryString;
2021
import org.apache.fluss.types.DataType;
2122

2223
import java.util.List;
@@ -40,8 +41,8 @@ private EndsWith() {}
4041

4142
@Override
4243
public boolean test(DataType type, Object field, Object patternLiteral) {
43-
String fieldString = field.toString();
44-
return fieldString.endsWith((String) patternLiteral);
44+
BinaryString fieldString = (BinaryString) field;
45+
return fieldString.endsWith((BinaryString) patternLiteral);
4546
}
4647

4748
@Override

fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.apache.fluss.row.InternalRow;
2121
import org.apache.fluss.types.DataType;
22-
import org.apache.fluss.types.DataTypes;
2322
import org.apache.fluss.types.DecimalType;
2423
import org.apache.fluss.types.LocalZonedTimestampType;
2524
import org.apache.fluss.types.TimestampType;
@@ -81,14 +80,6 @@ public List<Object> literals() {
8180
return literals;
8281
}
8382

84-
public LeafPredicate copyWithNewIndex(int fieldIndex) {
85-
return new LeafPredicate(function, type, fieldIndex, fieldName, literals);
86-
}
87-
88-
public LeafPredicate copyWithNewLiterals(List<Object> literals) {
89-
return new LeafPredicate(function, DataTypes.STRING(), fieldIndex, fieldName, literals);
90-
}
91-
9283
@Override
9384
public boolean test(InternalRow row) {
9485
return function.test(type, get(row, fieldIndex, type), literals);

fluss-common/src/main/java/org/apache/fluss/predicate/PartitionPredicateVisitor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
2424
* additional information regarding copyright ownership. */
2525

26-
/** Visit the predicate and check if it only contains partition key's predicate. */
26+
/**
27+
* Visit the predicate and check if it only contains partition key's predicate. Returns false if it
28+
* contains any predicates that filters on non-partition fields.
29+
*/
2730
public class PartitionPredicateVisitor implements PredicateVisitor<Boolean> {
2831

2932
private final List<String> partitionKeys;

fluss-common/src/main/java/org/apache/fluss/predicate/StartsWith.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.predicate;
1919

20+
import org.apache.fluss.row.BinaryString;
2021
import org.apache.fluss.types.DataType;
2122

2223
import java.util.List;
@@ -40,8 +41,8 @@ private StartsWith() {}
4041

4142
@Override
4243
public boolean test(DataType type, Object field, Object patternLiteral) {
43-
String fieldString = field.toString();
44-
return fieldString.startsWith((String) patternLiteral);
44+
BinaryString fieldString = (BinaryString) field;
45+
return fieldString.startsWith((BinaryString) patternLiteral);
4546
}
4647

4748
@Override

fluss-common/src/test/java/org/apache/fluss/predicate/PredicateTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ public void testNotInNull() {
325325
@Test
326326
public void testEndsWith() {
327327
PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType()));
328-
Predicate predicate = builder.endsWith(0, ("bcc"));
328+
Predicate predicate = builder.endsWith(0, fromString("bcc"));
329329
GenericRow row = GenericRow.of(fromString("aabbcc"));
330330
assertThat(predicate.test(row)).isEqualTo(true);
331331

@@ -339,7 +339,7 @@ public void testEndsWith() {
339339
@Test
340340
public void testStartWith() {
341341
PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType()));
342-
Predicate predicate = builder.startsWith(0, ("aab"));
342+
Predicate predicate = builder.startsWith(0, fromString("aab"));
343343
GenericRow row = GenericRow.of(fromString("aabbcc"));
344344
assertThat(predicate.test(row)).isEqualTo(true);
345345

@@ -357,7 +357,7 @@ public void testStartWith() {
357357
@Test
358358
public void testContainsWith() {
359359
PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType()));
360-
Predicate predicate = builder.contains(0, ("def"));
360+
Predicate predicate = builder.contains(0, fromString("def"));
361361
GenericRow row1 = GenericRow.of(fromString("aabbdefcc"));
362362
GenericRow row2 = GenericRow.of(fromString("aabbdcefcc"));
363363
assertThat(predicate.test(row1)).isEqualTo(true);

fluss-flink/fluss-flink-common/pom.xml

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
<properties>
3535
<flink.major.version>1.20</flink.major.version>
3636
<flink.minor.version>1.20.1</flink.minor.version>
37-
<scala.binary.version>2.12</scala.binary.version>
3837
</properties>
3938

4039
<dependencies>
@@ -127,21 +126,6 @@
127126
<scope>test</scope>
128127
</dependency>
129128

130-
<dependency>
131-
<groupId>org.apache.flink</groupId>
132-
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
133-
<version>${flink.minor.version}</version>
134-
<scope>test</scope>
135-
<type>test-jar</type>
136-
</dependency>
137-
138-
<dependency>
139-
<groupId>org.apache.flink</groupId>
140-
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
141-
<version>${flink.minor.version}</version>
142-
<scope>test</scope>
143-
</dependency>
144-
145129
<dependency>
146130
<groupId>org.apache.fluss</groupId>
147131
<artifactId>fluss-server</artifactId>

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,15 @@
1717

1818
package org.apache.fluss.flink.row;
1919

20-
import org.apache.fluss.flink.utils.FlinkConversions;
2120
import org.apache.fluss.row.BinaryString;
2221
import org.apache.fluss.row.Decimal;
2322
import org.apache.fluss.row.InternalRow;
2423
import org.apache.fluss.row.TimestampLtz;
2524
import org.apache.fluss.row.TimestampNtz;
2625

2726
import org.apache.flink.table.data.DecimalData;
28-
import org.apache.flink.table.data.GenericRowData;
2927
import org.apache.flink.table.data.RowData;
3028
import org.apache.flink.table.data.TimestampData;
31-
import org.apache.flink.table.types.DataType;
3229

3330
/** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */
3431
public class FlinkAsFlussRow implements InternalRow {
@@ -135,12 +132,4 @@ public byte[] getBinary(int pos, int length) {
135132
public byte[] getBytes(int pos) {
136133
return flinkRow.getBinary(pos);
137134
}
138-
139-
public static Object fromFlinkObject(Object o, DataType type) {
140-
if (o == null) {
141-
return null;
142-
}
143-
return InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0)
144-
.getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(o)));
145-
}
146135
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 15 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.fluss.flink.source.lookup.LookupNormalizer;
2727
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
2828
import org.apache.fluss.flink.utils.FlinkConversions;
29-
import org.apache.fluss.flink.utils.PredicateConverter;
3029
import org.apache.fluss.flink.utils.PushdownUtils;
3130
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
3231
import org.apache.fluss.lake.source.LakeSource;
@@ -39,11 +38,9 @@
3938
import org.apache.fluss.predicate.Predicate;
4039
import org.apache.fluss.predicate.PredicateBuilder;
4140
import org.apache.fluss.predicate.PredicateVisitor;
42-
import org.apache.fluss.row.BinaryString;
4341
import org.apache.fluss.row.TimestampLtz;
4442
import org.apache.fluss.types.DataTypes;
4543
import org.apache.fluss.types.RowType;
46-
import org.apache.fluss.utils.PartitionUtils;
4744

4845
import org.apache.flink.annotation.VisibleForTesting;
4946
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -91,11 +88,12 @@
9188
import java.util.List;
9289
import java.util.Map;
9390
import java.util.Optional;
94-
import java.util.stream.Collectors;
9591

9692
import static org.apache.fluss.flink.utils.LakeSourceUtils.createLakeSource;
93+
import static org.apache.fluss.flink.utils.PredicateConverter.convertToFlussPredicate;
9794
import static org.apache.fluss.flink.utils.PushdownUtils.ValueConversion.FLINK_INTERNAL_VALUE;
9895
import static org.apache.fluss.flink.utils.PushdownUtils.extractFieldEquals;
96+
import static org.apache.fluss.flink.utils.StringifyPredicateVisitor.stringifyPartitionPredicate;
9997
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
10098
import static org.apache.fluss.utils.Preconditions.checkNotNull;
10199
import static org.apache.fluss.utils.Preconditions.checkState;
@@ -506,43 +504,32 @@ && hasPrimaryKey()
506504
}
507505
singleRowFilter = lookupRow;
508506
return Result.of(acceptedFilters, remainingFilters);
509-
} else if (isPartitioned()
510-
&& !RowLevelModificationType.UPDATE.equals(modificationScanType)) {
507+
} else if (isPartitioned()) {
511508
// apply partition filter pushdown
512509
List<Predicate> converted = new ArrayList<>();
513510

514-
List<String> fieldNames = tableOutputType.getFieldNames();
515-
List<String> partitionKeys =
516-
Arrays.stream(partitionKeyIndexes)
517-
.mapToObj(fieldNames::get)
518-
.collect(Collectors.toList());
511+
RowType partitionRowType =
512+
FlinkConversions.toFlussRowType(tableOutputType).project(partitionKeyIndexes);
513+
PredicateVisitor<Boolean> checksOnlyPartitionKeys =
514+
new PartitionPredicateVisitor(partitionRowType.getFieldNames());
519515

520-
PredicateVisitor<Boolean> partitionPredicateVisitor =
521-
new PartitionPredicateVisitor(partitionKeys);
522-
523-
LogicalType[] partitionKeyTypes =
524-
Arrays.stream(partitionKeyIndexes)
525-
.mapToObj(producedDataType.getChildren()::get)
526-
.toArray(LogicalType[]::new);
527516
for (ResolvedExpression filter : filters) {
528517

529518
Optional<Predicate> predicateOptional =
530-
PredicateConverter.convert(
531-
org.apache.flink.table.types.logical.RowType.of(
532-
partitionKeyTypes, partitionKeys.toArray(new String[0])),
533-
filter);
519+
convertToFlussPredicate(partitionRowType, filter);
534520

535521
if (predicateOptional.isPresent()) {
536522
Predicate p = predicateOptional.get();
537-
if (!p.visit(partitionPredicateVisitor)) {
523+
// partition pushdown can only guarantee to filter out partitions matches the
524+
// predicate, but can't guarantee to filter out all data matches to
525+
// non-partition filter in the partition
526+
if (!p.visit(checksOnlyPartitionKeys)) {
538527
remainingFilters.add(filter);
539528
} else {
540529
acceptedFilters.add(filter);
541530
}
542-
// Convert literals in the predicate to string using
543-
// PartitionUtils.convertValueOfType
544-
p = stringifyPredicate(p);
545-
converted.add(p);
531+
// Convert literals in the predicate to partition string
532+
converted.add(stringifyPartitionPredicate(p));
546533
} else {
547534
remainingFilters.add(filter);
548535
}
@@ -553,7 +540,7 @@ && hasPrimaryKey()
553540
List<Predicate> lakePredicates = new ArrayList<>();
554541
for (ResolvedExpression filter : filters) {
555542
Optional<Predicate> predicateOptional =
556-
PredicateConverter.convert(tableOutputType, filter);
543+
convertToFlussPredicate(tableOutputType, filter);
557544
predicateOptional.ifPresent(lakePredicates::add);
558545
}
559546

@@ -631,24 +618,6 @@ private Map<Integer, LogicalType> getPrimaryKeyTypes() {
631618
return pkTypes;
632619
}
633620

634-
private Map<Integer, LogicalType> getPartitionKeyTypes() {
635-
Map<Integer, LogicalType> partitionKeyTypes = new HashMap<>();
636-
for (int index : partitionKeyIndexes) {
637-
partitionKeyTypes.put(index, tableOutputType.getTypeAt(index));
638-
}
639-
return partitionKeyTypes;
640-
}
641-
642-
private List<FieldEqual> stringifyFieldEquals(List<FieldEqual> fieldEquals) {
643-
List<FieldEqual> serialize = new ArrayList<>();
644-
for (FieldEqual fieldEqual : fieldEquals) {
645-
// revisit this again when we support more data types for partition key
646-
serialize.add(
647-
new FieldEqual(fieldEqual.fieldIndex, (fieldEqual.equalValue).toString()));
648-
}
649-
return serialize;
650-
}
651-
652621
// projection from pk_field_index to index_in_pk
653622
private int[] getKeyRowProjection() {
654623
int[] projection = new int[tableOutputType.getFieldCount()];
@@ -673,34 +642,4 @@ public int[] getPrimaryKeyIndexes() {
673642
public int[] getBucketKeyIndexes() {
674643
return bucketKeyIndexes;
675644
}
676-
677-
@VisibleForTesting
678-
public int[] getPartitionKeyIndexes() {
679-
return partitionKeyIndexes;
680-
}
681-
682-
/**
683-
* Converts literals in LeafPredicate to string representation using
684-
* PartitionUtils.convertValueOfType. This is necessary because partition metadata is stored as
685-
* string.
686-
*/
687-
private Predicate stringifyPredicate(Predicate predicate) {
688-
if (predicate instanceof LeafPredicate) {
689-
// Convert literals to string using PartitionUtils.convertValueOfType
690-
List<Object> convertedLiterals = new ArrayList<>();
691-
for (Object literal : ((LeafPredicate) predicate).literals()) {
692-
if (literal != null) {
693-
String stringValue =
694-
PartitionUtils.convertValueOfType(
695-
literal, ((LeafPredicate) predicate).type().getTypeRoot());
696-
convertedLiterals.add(BinaryString.fromString(stringValue));
697-
} else {
698-
convertedLiterals.add(null);
699-
}
700-
}
701-
return ((LeafPredicate) predicate).copyWithNewLiterals(convertedLiterals);
702-
}
703-
704-
return predicate;
705-
}
706645
}

0 commit comments

Comments
 (0)