Skip to content

Commit 9489f6a

Browse files
committed
[flink]support more partition type
1 parent 085ca99 commit 9489f6a

File tree

9 files changed

+1052
-11
lines changed

9 files changed

+1052
-11
lines changed

fluss-common/src/main/java/org/apache/fluss/predicate/LeafPredicate.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.fluss.row.InternalRow;
2121
import org.apache.fluss.types.DataType;
22+
import org.apache.fluss.types.DataTypes;
2223
import org.apache.fluss.types.DecimalType;
2324
import org.apache.fluss.types.LocalZonedTimestampType;
2425
import org.apache.fluss.types.TimestampType;
@@ -84,6 +85,10 @@ public LeafPredicate copyWithNewIndex(int fieldIndex) {
8485
return new LeafPredicate(function, type, fieldIndex, fieldName, literals);
8586
}
8687

88+
public LeafPredicate copyWithNewLiterals(List<Object> literals) {
89+
return new LeafPredicate(function, DataTypes.STRING(), fieldIndex, fieldName, literals);
90+
}
91+
8792
@Override
8893
public boolean test(InternalRow row) {
8994
return function.test(type, get(row, fieldIndex, type), literals);
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.predicate;
20+
21+
import javax.annotation.Nullable;
22+
23+
import java.util.Objects;
24+
25+
/**
26+
* A simple column statistics, supports the following stats.
27+
*
28+
* <ul>
29+
* <li>min: the minimum value of the column
30+
* <li>max: the maximum value of the column
31+
* <li>nullCount: the number of nulls
32+
* </ul>
33+
*/
34+
public class SimpleColStats {
35+
36+
public static final SimpleColStats NONE = new SimpleColStats(null, null, null);
37+
38+
@Nullable private final Object min;
39+
@Nullable private final Object max;
40+
private final Long nullCount;
41+
42+
public SimpleColStats(@Nullable Object min, @Nullable Object max, @Nullable Long nullCount) {
43+
this.min = min;
44+
this.max = max;
45+
this.nullCount = nullCount;
46+
}
47+
48+
@Nullable
49+
public Object min() {
50+
return min;
51+
}
52+
53+
@Nullable
54+
public Object max() {
55+
return max;
56+
}
57+
58+
@Nullable
59+
public Long nullCount() {
60+
return nullCount;
61+
}
62+
63+
public boolean isNone() {
64+
return min == null && max == null && nullCount == null;
65+
}
66+
67+
@Override
68+
public boolean equals(Object o) {
69+
if (!(o instanceof SimpleColStats)) {
70+
return false;
71+
}
72+
SimpleColStats that = (SimpleColStats) o;
73+
return Objects.equals(min, that.min)
74+
&& Objects.equals(max, that.max)
75+
&& Objects.equals(nullCount, that.nullCount);
76+
}
77+
78+
@Override
79+
public int hashCode() {
80+
return Objects.hash(min, max, nullCount);
81+
}
82+
83+
@Override
84+
public String toString() {
85+
return String.format("{%s, %s, %d}", min, max, nullCount);
86+
}
87+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.predicate;
20+
21+
import org.apache.fluss.row.GenericRow;
22+
23+
/** Utils for testing with {@link SimpleColStats}. */
24+
public class SimpleColStatsTestUtils {
25+
26+
public static boolean test(Predicate predicate, long rowCount, SimpleColStats[] fieldStats) {
27+
Object[] min = new Object[fieldStats.length];
28+
Object[] max = new Object[fieldStats.length];
29+
Long[] nullCounts = new Long[fieldStats.length];
30+
for (int i = 0; i < fieldStats.length; i++) {
31+
min[i] = fieldStats[i].min();
32+
max[i] = fieldStats[i].max();
33+
nullCounts[i] = fieldStats[i].nullCount();
34+
}
35+
36+
return predicate.test(rowCount, GenericRow.of(min), GenericRow.of(max), nullCounts);
37+
}
38+
}

fluss-flink/fluss-flink-common/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
<properties>
3535
<flink.major.version>1.20</flink.major.version>
3636
<flink.minor.version>1.20.1</flink.minor.version>
37+
<scala.binary.version>2.12</scala.binary.version>
3738
</properties>
3839

3940
<dependencies>
@@ -126,6 +127,21 @@
126127
<scope>test</scope>
127128
</dependency>
128129

130+
<dependency>
131+
<groupId>org.apache.flink</groupId>
132+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
133+
<version>${flink.minor.version}</version>
134+
<scope>test</scope>
135+
<type>test-jar</type>
136+
</dependency>
137+
138+
<dependency>
139+
<groupId>org.apache.flink</groupId>
140+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
141+
<version>${flink.minor.version}</version>
142+
<scope>test</scope>
143+
</dependency>
144+
129145
<dependency>
130146
<groupId>org.apache.fluss</groupId>
131147
<artifactId>fluss-server</artifactId>

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@
3939
import org.apache.fluss.predicate.Predicate;
4040
import org.apache.fluss.predicate.PredicateBuilder;
4141
import org.apache.fluss.predicate.PredicateVisitor;
42+
import org.apache.fluss.row.BinaryString;
4243
import org.apache.fluss.row.TimestampLtz;
4344
import org.apache.fluss.types.DataTypes;
4445
import org.apache.fluss.types.RowType;
46+
import org.apache.fluss.utils.PartitionUtils;
4547

4648
import org.apache.flink.annotation.VisibleForTesting;
4749
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -75,7 +77,6 @@
7577
import org.apache.flink.table.functions.LookupFunction;
7678
import org.apache.flink.table.types.DataType;
7779
import org.apache.flink.table.types.logical.LogicalType;
78-
import org.apache.flink.table.types.logical.VarCharType;
7980
import org.slf4j.Logger;
8081
import org.slf4j.LoggerFactory;
8182

@@ -519,11 +520,9 @@ && hasPrimaryKey()
519520
PredicateVisitor<Boolean> partitionPredicateVisitor =
520521
new PartitionPredicateVisitor(partitionKeys);
521522

522-
// TODO after https://github.com/alibaba/fluss/pull/979
523-
// replace string type with the real type
524523
LogicalType[] partitionKeyTypes =
525-
partitionKeys.stream()
526-
.map(key -> VarCharType.STRING_TYPE)
524+
Arrays.stream(partitionKeyIndexes)
525+
.mapToObj(producedDataType.getChildren()::get)
527526
.toArray(LogicalType[]::new);
528527
for (ResolvedExpression filter : filters) {
529528

@@ -540,6 +539,9 @@ && hasPrimaryKey()
540539
} else {
541540
acceptedFilters.add(filter);
542541
}
542+
// Convert literals in the predicate to string using
543+
// PartitionUtils.convertValueOfType
544+
p = convertPredicateLiteralsToString(p);
543545
converted.add(p);
544546
} else {
545547
remainingFilters.add(filter);
@@ -676,4 +678,29 @@ public int[] getBucketKeyIndexes() {
676678
public int[] getPartitionKeyIndexes() {
677679
return partitionKeyIndexes;
678680
}
681+
682+
/**
683+
* Converts literals in LeafPredicate to string representation using
684+
* PartitionUtils.convertValueOfType. This is necessary because partition metadata is stored as
685+
* string.
686+
*/
687+
private Predicate convertPredicateLiteralsToString(Predicate predicate) {
688+
if (predicate instanceof LeafPredicate) {
689+
// Convert literals to string using PartitionUtils.convertValueOfType
690+
List<Object> convertedLiterals = new ArrayList<>();
691+
for (Object literal : ((LeafPredicate) predicate).literals()) {
692+
if (literal != null) {
693+
String stringValue =
694+
PartitionUtils.convertValueOfType(
695+
literal, ((LeafPredicate) predicate).type().getTypeRoot());
696+
convertedLiterals.add(BinaryString.fromString(stringValue));
697+
} else {
698+
convertedLiterals.add(null);
699+
}
700+
}
701+
return ((LeafPredicate) predicate).copyWithNewLiterals(convertedLiterals);
702+
}
703+
704+
return predicate;
705+
}
679706
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.predicate.UnsupportedExpression;
2424
import org.apache.fluss.utils.TypeUtils;
2525

26+
import org.apache.flink.table.data.conversion.DataStructureConverters;
2627
import org.apache.flink.table.expressions.CallExpression;
2728
import org.apache.flink.table.expressions.Expression;
2829
import org.apache.flink.table.expressions.ExpressionVisitor;
@@ -33,6 +34,7 @@
3334
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
3435
import org.apache.flink.table.functions.FunctionDefinition;
3536
import org.apache.flink.table.types.DataType;
37+
import org.apache.flink.table.types.logical.BooleanType;
3638
import org.apache.flink.table.types.logical.LogicalType;
3739
import org.apache.flink.table.types.logical.LogicalTypeFamily;
3840
import org.apache.flink.table.types.logical.RowType;
@@ -83,6 +85,8 @@ public Predicate visit(CallExpression call) {
8385
return PredicateBuilder.and(children.get(0).accept(this), children.get(1).accept(this));
8486
} else if (func == BuiltInFunctionDefinitions.OR) {
8587
return PredicateBuilder.or(children.get(0).accept(this), children.get(1).accept(this));
88+
} else if (func == BuiltInFunctionDefinitions.NOT) {
89+
return visitNotFunction(children, builder::equal, builder::equal);
8690
} else if (func == BuiltInFunctionDefinitions.EQUALS) {
8791
return visitBiFunction(children, builder::equal, builder::equal);
8892
} else if (func == BuiltInFunctionDefinitions.NOT_EQUALS) {
@@ -115,6 +119,11 @@ public Predicate visit(CallExpression call) {
115119
.map(builder::indexOf)
116120
.map(builder::isNotNull)
117121
.orElseThrow(UnsupportedExpression::new);
122+
} else if (func == BuiltInFunctionDefinitions.BETWEEN) {
123+
FieldReferenceExpression fieldRefExpr =
124+
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
125+
return builder.between(
126+
builder.indexOf(fieldRefExpr.getName()), children.get(1), children.get(2));
118127
} else if (func == BuiltInFunctionDefinitions.LIKE) {
119128
FieldReferenceExpression fieldRefExpr =
120129
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
@@ -183,6 +192,24 @@ private Predicate visitBiFunction(
183192
throw new UnsupportedExpression();
184193
}
185194

195+
private Predicate visitNotFunction(
196+
List<Expression> children,
197+
BiFunction<Integer, Object, Predicate> visit1,
198+
BiFunction<Integer, Object, Predicate> visit2) {
199+
Optional<FieldReferenceExpression> fieldRefExpr = extractFieldReference(children.get(0));
200+
if (fieldRefExpr.isPresent() && builder.indexOf(fieldRefExpr.get().getName()) != -1) {
201+
202+
return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), false);
203+
} else {
204+
fieldRefExpr = extractFieldReference(children.get(1));
205+
if (fieldRefExpr.isPresent()) {
206+
return visit2.apply(builder.indexOf(fieldRefExpr.get().getName()), false);
207+
}
208+
}
209+
210+
throw new UnsupportedExpression();
211+
}
212+
186213
private Optional<FieldReferenceExpression> extractFieldReference(Expression expression) {
187214
if (expression instanceof FieldReferenceExpression) {
188215
return Optional.of((FieldReferenceExpression) expression);
@@ -207,9 +234,11 @@ private Object extractLiteral(DataType expectedType, Expression expression) {
207234
Optional<?> valueOpt = valueExpression.getValueAs(actualType.getConversionClass());
208235
if (valueOpt.isPresent()) {
209236
Object value = valueOpt.get();
210-
if (actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())
211-
&& !isStringType(expectedLogicalType)) {
212-
return FlinkAsFlussRow.fromFlinkObject(value, expectedType);
237+
if (actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())) {
238+
return FlinkAsFlussRow.fromFlinkObject(
239+
DataStructureConverters.getConverter(expectedType)
240+
.toInternalOrNull(value),
241+
expectedType);
213242
} else if (supportsImplicitCast(actualLogicalType, expectedLogicalType)) {
214243
try {
215244
return TypeUtils.castFromString(
@@ -267,6 +296,9 @@ public Predicate visit(ValueLiteralExpression valueLiteralExpression) {
267296

268297
@Override
269298
public Predicate visit(FieldReferenceExpression fieldReferenceExpression) {
299+
if (fieldReferenceExpression.getOutputDataType().getLogicalType() instanceof BooleanType) {
300+
return builder.equal(builder.indexOf(fieldReferenceExpression.getName()), true);
301+
}
270302
throw new UnsupportedExpression();
271303
}
272304

0 commit comments

Comments
 (0)