Skip to content

Commit 048dff7

Browse files
Alibaba-HZYwuchong
authored andcommitted
[flink] Support general predicates for partition pushdown in Flink Connector (apache#420)
1 parent 4345663 commit 048dff7

File tree

17 files changed

+1728
-110
lines changed

17 files changed

+1728
-110
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ Apache Kafka
361361
./fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java
362362

363363
Apache Paimon
364+
./fluss-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
364365
./fluss-common/src/main/java/org/apache/fluss/predicate/And.java
365366
./fluss-common/src/main/java/org/apache/fluss/predicate/CompareUtils.java
366367
./fluss-common/src/main/java/org/apache/fluss/predicate/CompoundPredicate.java

fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.row.InternalRow;
2121
import org.apache.fluss.types.DataType;
22+
import org.apache.fluss.types.DataTypes;
2223
import org.apache.fluss.types.DecimalType;
2324
import org.apache.fluss.types.LocalZonedTimestampType;
2425
import org.apache.fluss.types.TimestampType;
@@ -84,6 +85,10 @@ public LeafPredicate copyWithNewIndex(int fieldIndex) {
8485
return new LeafPredicate(function, type, fieldIndex, fieldName, literals);
8586
}
8687

88+
public LeafPredicate copyWithNewLiterals(List<Object> literals) {
89+
return new LeafPredicate(function, DataTypes.STRING(), fieldIndex, fieldName, literals);
90+
}
91+
8792
@Override
8893
public boolean test(InternalRow row) {
8994
return function.test(type, get(row, fieldIndex, type), literals);
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.predicate;
20+
21+
import javax.annotation.Nullable;
22+
23+
import java.util.Objects;
24+
25+
/**
26+
* A simple column statistics, supports the following stats.
27+
*
28+
* <ul>
29+
* <li>min: the minimum value of the column
30+
* <li>max: the maximum value of the column
31+
* <li>nullCount: the number of nulls
32+
* </ul>
33+
*/
34+
public class SimpleColStats {
35+
36+
public static final SimpleColStats NONE = new SimpleColStats(null, null, null);
37+
38+
@Nullable private final Object min;
39+
@Nullable private final Object max;
40+
private final Long nullCount;
41+
42+
public SimpleColStats(@Nullable Object min, @Nullable Object max, @Nullable Long nullCount) {
43+
this.min = min;
44+
this.max = max;
45+
this.nullCount = nullCount;
46+
}
47+
48+
@Nullable
49+
public Object min() {
50+
return min;
51+
}
52+
53+
@Nullable
54+
public Object max() {
55+
return max;
56+
}
57+
58+
@Nullable
59+
public Long nullCount() {
60+
return nullCount;
61+
}
62+
63+
public boolean isNone() {
64+
return min == null && max == null && nullCount == null;
65+
}
66+
67+
@Override
68+
public boolean equals(Object o) {
69+
if (!(o instanceof SimpleColStats)) {
70+
return false;
71+
}
72+
SimpleColStats that = (SimpleColStats) o;
73+
return Objects.equals(min, that.min)
74+
&& Objects.equals(max, that.max)
75+
&& Objects.equals(nullCount, that.nullCount);
76+
}
77+
78+
@Override
79+
public int hashCode() {
80+
return Objects.hash(min, max, nullCount);
81+
}
82+
83+
@Override
84+
public String toString() {
85+
return String.format("{%s, %s, %d}", min, max, nullCount);
86+
}
87+
}

fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.apache.fluss.row.Decimal;
2222
import org.apache.fluss.types.DataType;
2323
import org.apache.fluss.types.DecimalType;
24+
import org.apache.fluss.types.LocalZonedTimestampType;
2425
import org.apache.fluss.types.TimestampType;
2526

2627
import java.math.BigDecimal;
2728
import java.nio.charset.StandardCharsets;
29+
import java.util.TimeZone;
2830

2931
/** Type related helper functions. */
3032
public class TypeUtils {
@@ -62,6 +64,10 @@ public static Object castFromString(String s, DataType type) {
6264
case TIMESTAMP_WITHOUT_TIME_ZONE:
6365
TimestampType timestampType = (TimestampType) type;
6466
return BinaryStringUtils.toTimestampNtz(str, timestampType.getPrecision());
67+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
68+
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
69+
return BinaryStringUtils.toTimestampLtz(
70+
str, localZonedTimestampType.getPrecision(), TimeZone.getDefault());
6571
default:
6672
throw new UnsupportedOperationException("Unsupported type " + type);
6773
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.predicate;
20+
21+
import org.apache.fluss.row.GenericRow;
22+
23+
/** Utils for testing with {@link SimpleColStats}. */
24+
public class SimpleColStatsTestUtils {
25+
26+
public static boolean test(Predicate predicate, long rowCount, SimpleColStats[] fieldStats) {
27+
Object[] min = new Object[fieldStats.length];
28+
Object[] max = new Object[fieldStats.length];
29+
Long[] nullCounts = new Long[fieldStats.length];
30+
for (int i = 0; i < fieldStats.length; i++) {
31+
min[i] = fieldStats[i].min();
32+
max[i] = fieldStats[i].max();
33+
nullCounts[i] = fieldStats[i].nullCount();
34+
}
35+
36+
return predicate.test(rowCount, GenericRow.of(min), GenericRow.of(max), nullCounts);
37+
}
38+
}

fluss-flink/fluss-flink-common/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
<properties>
3535
<flink.major.version>1.20</flink.major.version>
3636
<flink.minor.version>1.20.1</flink.minor.version>
37+
<scala.binary.version>2.12</scala.binary.version>
3738
</properties>
3839

3940
<dependencies>
@@ -126,6 +127,21 @@
126127
<scope>test</scope>
127128
</dependency>
128129

130+
<dependency>
131+
<groupId>org.apache.flink</groupId>
132+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
133+
<version>${flink.minor.version}</version>
134+
<scope>test</scope>
135+
<type>test-jar</type>
136+
</dependency>
137+
138+
<dependency>
139+
<groupId>org.apache.flink</groupId>
140+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
141+
<version>${flink.minor.version}</version>
142+
<scope>test</scope>
143+
</dependency>
144+
129145
<dependency>
130146
<groupId>org.apache.fluss</groupId>
131147
<artifactId>fluss-server</artifactId>

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/row/FlinkAsFlussRow.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717

1818
package org.apache.fluss.flink.row;
1919

20+
import org.apache.fluss.flink.utils.FlinkConversions;
2021
import org.apache.fluss.row.BinaryString;
2122
import org.apache.fluss.row.Decimal;
2223
import org.apache.fluss.row.InternalRow;
2324
import org.apache.fluss.row.TimestampLtz;
2425
import org.apache.fluss.row.TimestampNtz;
2526

2627
import org.apache.flink.table.data.DecimalData;
28+
import org.apache.flink.table.data.GenericRowData;
2729
import org.apache.flink.table.data.RowData;
2830
import org.apache.flink.table.data.TimestampData;
31+
import org.apache.flink.table.types.DataType;
2932

3033
/** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */
3134
public class FlinkAsFlussRow implements InternalRow {
@@ -132,4 +135,12 @@ public byte[] getBinary(int pos, int length) {
132135
public byte[] getBytes(int pos) {
133136
return flinkRow.getBinary(pos);
134137
}
138+
139+
public static Object fromFlinkObject(Object o, DataType type) {
140+
if (o == null) {
141+
return null;
142+
}
143+
return InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0)
144+
.getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(o)));
145+
}
135146
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkSource.java

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
import org.apache.fluss.flink.source.split.SourceSplitSerializer;
3131
import org.apache.fluss.flink.source.state.FlussSourceEnumeratorStateSerializer;
3232
import org.apache.fluss.flink.source.state.SourceEnumeratorState;
33-
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
3433
import org.apache.fluss.lake.source.LakeSource;
3534
import org.apache.fluss.lake.source.LakeSplit;
3635
import org.apache.fluss.metadata.TablePath;
36+
import org.apache.fluss.predicate.Predicate;
3737
import org.apache.fluss.types.RowType;
3838

3939
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -50,10 +50,6 @@
5050

5151
import javax.annotation.Nullable;
5252

53-
import java.util.List;
54-
55-
import static org.apache.fluss.utils.Preconditions.checkNotNull;
56-
5753
/** Flink source for Fluss. */
5854
public class FlinkSource<OUT>
5955
implements Source<OUT, SourceSplitBase, SourceEnumeratorState>, ResultTypeQueryable {
@@ -69,10 +65,8 @@ public class FlinkSource<OUT>
6965
protected final long scanPartitionDiscoveryIntervalMs;
7066
private final boolean streaming;
7167
private final FlussDeserializationSchema<OUT> deserializationSchema;
72-
73-
private final List<FieldEqual> partitionFilters;
74-
75-
private final @Nullable LakeSource<LakeSplit> lakeSource;
68+
@Nullable private final Predicate partitionFilters;
69+
@Nullable private final LakeSource<LakeSplit> lakeSource;
7670

7771
public FlinkSource(
7872
Configuration flussConf,
@@ -85,7 +79,7 @@ public FlinkSource(
8579
long scanPartitionDiscoveryIntervalMs,
8680
FlussDeserializationSchema<OUT> deserializationSchema,
8781
boolean streaming,
88-
List<FieldEqual> partitionFilters) {
82+
@Nullable Predicate partitionFilters) {
8983
this(
9084
flussConf,
9185
tablePath,
@@ -112,8 +106,8 @@ public FlinkSource(
112106
long scanPartitionDiscoveryIntervalMs,
113107
FlussDeserializationSchema<OUT> deserializationSchema,
114108
boolean streaming,
115-
List<FieldEqual> partitionFilters,
116-
LakeSource<LakeSplit> lakeSource) {
109+
@Nullable Predicate partitionFilters,
110+
@Nullable LakeSource<LakeSplit> lakeSource) {
117111
this.flussConf = flussConf;
118112
this.tablePath = tablePath;
119113
this.hasPrimaryKey = hasPrimaryKey;
@@ -124,7 +118,7 @@ public FlinkSource(
124118
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
125119
this.deserializationSchema = deserializationSchema;
126120
this.streaming = streaming;
127-
this.partitionFilters = checkNotNull(partitionFilters);
121+
this.partitionFilters = partitionFilters;
128122
this.lakeSource = lakeSource;
129123
}
130124

0 commit comments

Comments
 (0)