Skip to content

Commit 25d563c

Browse files
committed
[flink]add index validate
1 parent 0676902 commit 25d563c

File tree

4 files changed

+89
-10
lines changed

4 files changed

+89
-10
lines changed

fluss-common/src/main/java/com/alibaba/fluss/predicate/PredicateBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,18 +121,21 @@ public Predicate contains(int idx, Object patternLiteral) {
121121
}
122122

123123
public Predicate leaf(NullFalseLeafBinaryFunction function, int idx, Object literal) {
124+
validateIndex(idx);
124125
DataField field = rowType.getFields().get(idx);
125126
return new LeafPredicate(
126127
function, field.getType(), idx, field.getName(), singletonList(literal));
127128
}
128129

129130
public Predicate leaf(LeafUnaryFunction function, int idx) {
131+
validateIndex(idx);
130132
DataField field = rowType.getFields().get(idx);
131133
return new LeafPredicate(
132134
function, field.getType(), idx, field.getName(), Collections.emptyList());
133135
}
134136

135137
public Predicate in(int idx, List<Object> literals) {
138+
validateIndex(idx);
136139
// In the IN predicate, 20 literals are critical for performance.
137140
// If there are more than 20 literals, the performance will decrease.
138141
if (literals.size() > 20) {
@@ -233,6 +236,12 @@ private static void splitCompound(
233236
}
234237
}
235238

239+
private void validateIndex(int idx) {
240+
if (idx < 0 || idx >= rowType.getFieldCount()) {
241+
throw new UnsupportedExpression("idx is not valid");
242+
}
243+
}
244+
236245
public static Object convertJavaObject(DataType literalType, Object o) {
237246
if (o == null) {
238247
return null;
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.predicate;
19+
20+
/** Encounter an unsupported expression, the caller can choose to ignore this filter branch. */
21+
public class UnsupportedExpression extends RuntimeException {
22+
public UnsupportedExpression(String message) {
23+
super(message);
24+
}
25+
26+
public UnsupportedExpression() {
27+
super();
28+
}
29+
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/PredicateConverter.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.alibaba.fluss.flink.utils.FlinkConversions;
2323
import com.alibaba.fluss.predicate.Predicate;
2424
import com.alibaba.fluss.predicate.PredicateBuilder;
25+
import com.alibaba.fluss.predicate.UnsupportedExpression;
2526
import com.alibaba.fluss.utils.TypeUtils;
2627

2728
import org.apache.flink.table.expressions.CallExpression;
@@ -119,11 +120,12 @@ public Predicate visit(CallExpression call) {
119120
FieldReferenceExpression fieldRefExpr =
120121
extractFieldReference(children.get(0)).orElseThrow(UnsupportedExpression::new);
121122
if (fieldRefExpr
122-
.getOutputDataType()
123-
.getLogicalType()
124-
.getTypeRoot()
125-
.getFamilies()
126-
.contains(LogicalTypeFamily.CHARACTER_STRING)) {
123+
.getOutputDataType()
124+
.getLogicalType()
125+
.getTypeRoot()
126+
.getFamilies()
127+
.contains(LogicalTypeFamily.CHARACTER_STRING)
128+
&& builder.indexOf(fieldRefExpr.getName()) != -1) {
127129
String sqlPattern =
128130
Objects.requireNonNull(
129131
extractLiteral(
@@ -294,7 +296,4 @@ public static Optional<Predicate> convert(RowType rowType, ResolvedExpression fi
294296
return Optional.empty();
295297
}
296298
}
297-
298-
/** Encounter an unsupported expression, the caller can choose to ignore this filter branch. */
299-
public static class UnsupportedExpression extends RuntimeException {}
300299
}

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,8 +1185,9 @@ private List<String> writeRowsToTwoPartition(TablePath tablePath, Collection<Str
11851185
values[1] = keyValuePairs[1].split("=")[1];
11861186

11871187
for (int i = 0; i < 10; i++) {
1188-
rows.add(row(i, "v1", values[0], values[1]));
1189-
expectedRowValues.add(String.format("+I[%d, v1, %s, %s]", i, values[0], values[1]));
1188+
rows.add(row(i, "v" + i, values[0], values[1]));
1189+
expectedRowValues.add(
1190+
String.format("+I[%d, v%d, %s, %s]", i, i, values[0], values[1]));
11901191
}
11911192
}
11921193

@@ -1354,6 +1355,47 @@ void testStreamingReadPartitionPushDownWithLikeExpr() throws Exception {
13541355
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
13551356
}
13561357

1358+
@Test
1359+
void testStreamingReadPartitionComplexPushDown() throws Exception {
1360+
1361+
tEnv.executeSql(
1362+
"create table partitioned_table_complex"
1363+
+ " (a int not null, b varchar, c string,d string, primary key (a, c, d) NOT ENFORCED) partitioned by (c,d) ");
1364+
TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table_complex");
1365+
tEnv.executeSql("alter table partitioned_table_complex add partition (c=2025,d=1)");
1366+
tEnv.executeSql("alter table partitioned_table_complex add partition (c=2025,d=2)");
1367+
tEnv.executeSql("alter table partitioned_table_complex add partition (c=2026,d=1)");
1368+
1369+
List<String> allData =
1370+
writeRowsToTwoPartition(
1371+
tablePath, Arrays.asList("c=2025,d=1", "c=2025,d=2", "c=2026,d=1"));
1372+
List<String> expectedRowValues =
1373+
allData.stream()
1374+
.filter(s -> s.contains("v3") && !s.contains("2025, 2"))
1375+
.collect(Collectors.toList());
1376+
waitUtilAllBucketFinishSnapshot(
1377+
admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1"));
1378+
1379+
String plan =
1380+
tEnv.explainSql(
1381+
"select * from partitioned_table_complex where a = 3\n"
1382+
+ " and (c in ('2026') or d like '%1%') "
1383+
+ " and b like '%v3%'");
1384+
assertThat(plan)
1385+
.contains(
1386+
"Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND LIKE(b, '%v3%'))])\n"
1387+
+ "+- TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_complex, filter=[OR(=(c, _UTF-16LE'2026'), LIKE(d, _UTF-16LE'%1%'))]]], fields=[a, b, c, d])");
1388+
1389+
org.apache.flink.util.CloseableIterator<Row> rowIter =
1390+
tEnv.executeSql(
1391+
"select * from partitioned_table_complex where a = 3\n"
1392+
+ " and (c in ('2026') or d like '%1%') "
1393+
+ " and b like '%v3%'")
1394+
.collect();
1395+
1396+
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
1397+
}
1398+
13571399
private enum Caching {
13581400
ENABLE_CACHE,
13591401
DISABLE_CACHE

0 commit comments

Comments
 (0)