Skip to content

Commit 463e7d0

Browse files
committed
WIP
1 parent 9e750c1 commit 463e7d0

File tree

5 files changed

+42
-58
lines changed

5 files changed

+42
-58
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ Apache Kafka
359359
./fluss-server/src/main/java/org/apache/fluss/server/utils/timer/TimingWheel.java
360360

361361
Apache Paimon
362+
./fluss-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java
362363
./fluss-common/src/main/java/org/apache/fluss/predicate/And.java
363364
./fluss-common/src/main/java/org/apache/fluss/predicate/CompareUtils.java
364365
./fluss-common/src/main/java/org/apache/fluss/predicate/CompoundPredicate.java

fluss-common/src/main/java/org/apache/fluss/utils/TypeUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.apache.fluss.row.Decimal;
2222
import org.apache.fluss.types.DataType;
2323
import org.apache.fluss.types.DecimalType;
24+
import org.apache.fluss.types.LocalZonedTimestampType;
2425
import org.apache.fluss.types.TimestampType;
2526

2627
import java.math.BigDecimal;
2728
import java.nio.charset.StandardCharsets;
29+
import java.util.TimeZone;
2830

2931
/** Type related helper functions. */
3032
public class TypeUtils {
@@ -62,6 +64,10 @@ public static Object castFromString(String s, DataType type) {
6264
case TIMESTAMP_WITHOUT_TIME_ZONE:
6365
TimestampType timestampType = (TimestampType) type;
6466
return BinaryStringUtils.toTimestampNtz(str, timestampType.getPrecision());
67+
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
68+
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
69+
return BinaryStringUtils.toTimestampLtz(
70+
str, localZonedTimestampType.getPrecision(), TimeZone.getDefault());
6571
default:
6672
throw new UnsupportedOperationException("Unsupported type " + type);
6773
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.fluss.flink.source.lookup.LookupNormalizer;
2727
import org.apache.fluss.flink.utils.FlinkConnectorOptionsUtils;
2828
import org.apache.fluss.flink.utils.FlinkConversions;
29+
import org.apache.fluss.flink.utils.PredicateConverter;
2930
import org.apache.fluss.flink.utils.PushdownUtils;
3031
import org.apache.fluss.flink.utils.PushdownUtils.FieldEqual;
3132
import org.apache.fluss.lake.source.LakeSource;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/PredicateConverter.java renamed to fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/PredicateConverter.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one
3-
* or more contributor license agreements. See the NOTICE file
4-
* distributed with this work for additional information
5-
* regarding copyright ownership. The ASF licenses this file
6-
* to you under the Apache License, Version 2.0 (the
7-
* "License"); you may not use this file except in compliance
8-
* with the License. You may obtain a copy of the License at
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
98
*
109
* http://www.apache.org/licenses/LICENSE-2.0
1110
*
@@ -16,10 +15,9 @@
1615
* limitations under the License.
1716
*/
1817

19-
package org.apache.fluss.flink.source;
18+
package org.apache.fluss.flink.utils;
2019

2120
import org.apache.fluss.flink.row.FlinkAsFlussRow;
22-
import org.apache.fluss.flink.utils.FlinkConversions;
2321
import org.apache.fluss.predicate.Predicate;
2422
import org.apache.fluss.predicate.PredicateBuilder;
2523
import org.apache.fluss.predicate.UnsupportedExpression;
@@ -51,8 +49,9 @@
5149
/* This file is based on source code of Apache Paimon Project (https://paimon.apache.org/), licensed by the Apache
5250
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
5351
* additional information regarding copyright ownership. */
52+
5453
/**
55-
* Convert {@link Expression} to {@link Predicate}.
54+
* Convert Flink {@link Expression} to Fluss {@link Predicate}.
5655
*
5756
* <p>For {@link FieldReferenceExpression}, please use name instead of index, if the project
5857
* pushdown is before and the filter pushdown is after, the index of the filter will be projected.

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 25 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,27 +1191,21 @@ void testStreamingReadPartitionPushDownWithInExpr() throws Exception {
11911191
.collect(Collectors.toList());
11921192
waitUntilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "2027"));
11931193

1194-
String plan =
1195-
tEnv.explainSql("select * from partitioned_table_in where c in ('2025','2026')");
1194+
String query1 = "select * from partitioned_table_in where c in ('2025','2026')";
1195+
String plan = tEnv.explainSql(query1);
11961196
assertThat(plan)
11971197
.contains(
11981198
"TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, _UTF-16LE'2026'))]]], fields=[a, b, c])");
11991199

1200-
org.apache.flink.util.CloseableIterator<Row> rowIter =
1201-
tEnv.executeSql("select * from partitioned_table_in where c in ('2025','2026')")
1202-
.collect();
1203-
1200+
org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query1).collect();
12041201
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
12051202

1206-
plan = tEnv.explainSql("select * from partitioned_table_in where c ='2025' or c ='2026'");
1203+
String query2 = "select * from partitioned_table_in where c ='2025' or c ='2026'";
1204+
plan = tEnv.explainSql(query2);
12071205
assertThat(plan)
12081206
.contains(
12091207
"TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"), =(c, _UTF-16LE'2026':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], fields=[a, b, c])");
1210-
1211-
rowIter =
1212-
tEnv.executeSql("select * from partitioned_table_in where c ='2025' or c ='2026'")
1213-
.collect();
1214-
1208+
rowIter = tEnv.executeSql(query2).collect();
12151209
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
12161210
}
12171211

@@ -1240,7 +1234,6 @@ void testStreamingReadWithCombinedFiltersAndInExpr() throws Exception {
12401234
expectedRowValues.add(String.format("+I[%d, 2026, %d]", i, i * 100));
12411235
}
12421236
}
1243-
writeRows(conn, tablePath, rows, false);
12441237

12451238
for (int i = 0; i < 10; i++) {
12461239
rows.add(row(i, "v" + i, "2027", i * 100));
@@ -1249,20 +1242,15 @@ void testStreamingReadWithCombinedFiltersAndInExpr() throws Exception {
12491242
writeRows(conn, tablePath, rows, false);
12501243
waitUntilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "2027"));
12511244

1252-
String plan =
1253-
tEnv.explainSql(
1254-
"select a,c,d from combined_filters_table_in where c in ('2025','2026') and d % 200 = 0");
1245+
String query1 =
1246+
"select a,c,d from combined_filters_table_in where c in ('2025','2026') and d % 200 = 0";
1247+
String plan = tEnv.explainSql(query1);
12551248
assertThat(plan)
12561249
.contains(
12571250
"TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, _UTF-16LE'2026'))], project=[a, c, d]]], fields=[a, c, d])");
12581251

12591252
// test column filter、partition filter and flink runtime filter
1260-
org.apache.flink.util.CloseableIterator<Row> rowIter =
1261-
tEnv.executeSql(
1262-
"select a,c,d from combined_filters_table_in where c in ('2025','2026') "
1263-
+ "and d % 200 = 0")
1264-
.collect();
1265-
1253+
org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query1).collect();
12661254
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
12671255

12681256
rowIter =
@@ -1293,41 +1281,35 @@ void testStreamingReadPartitionPushDownWithLikeExpr() throws Exception {
12931281
.collect(Collectors.toList());
12941282
waitUntilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "3026"));
12951283

1296-
String plan = tEnv.explainSql("select * from partitioned_table_like where c like '202%'");
1284+
String query1 = "select * from partitioned_table_like where c like '202%'";
1285+
String plan = tEnv.explainSql(query1);
12971286
assertThat(plan)
12981287
.contains(
12991288
"TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_like, filter=[LIKE(c, _UTF-16LE'202%')]]], fields=[a, b, c])");
13001289

1301-
org.apache.flink.util.CloseableIterator<Row> rowIter =
1302-
tEnv.executeSql("select * from partitioned_table_like where c like '202%'")
1303-
.collect();
1290+
org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query1).collect();
13041291

13051292
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
13061293
expectedRowValues =
13071294
allData.stream()
13081295
.filter(s -> s.contains("2026") || s.contains("3026"))
13091296
.collect(Collectors.toList());
1310-
plan = tEnv.explainSql("select * from partitioned_table_like where c like '%026'");
1297+
String query2 = "select * from partitioned_table_like where c like '%026'";
1298+
plan = tEnv.explainSql(query2);
13111299
assertThat(plan)
13121300
.contains(
13131301
"TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_like, filter=[LIKE(c, _UTF-16LE'%026')]]], fields=[a, b, c])");
1314-
1315-
rowIter =
1316-
tEnv.executeSql("select * from partitioned_table_like where c like '%026'")
1317-
.collect();
1318-
1302+
rowIter = tEnv.executeSql(query2).collect();
13191303
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
13201304

13211305
expectedRowValues =
13221306
allData.stream().filter(s -> s.contains("3026")).collect(Collectors.toList());
1323-
plan = tEnv.explainSql("select * from partitioned_table_like where c like '%3026%'");
1307+
String query3 = "select * from partitioned_table_like where c like '%3026%'";
1308+
plan = tEnv.explainSql(query3);
13241309
assertThat(plan)
13251310
.contains(
13261311
"TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_like, filter=[LIKE(c, _UTF-16LE'%3026%')]]], fields=[a, b, c])");
1327-
1328-
rowIter =
1329-
tEnv.executeSql("select * from partitioned_table_like where c like '%3026%'")
1330-
.collect();
1312+
rowIter = tEnv.executeSql(query3).collect();
13311313

13321314
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
13331315
}
@@ -1353,22 +1335,17 @@ void testStreamingReadPartitionComplexPushDown() throws Exception {
13531335
waitUntilAllBucketFinishSnapshot(
13541336
admin, tablePath, Arrays.asList("2025$1", "2025$2", "2026$1"));
13551337

1356-
String plan =
1357-
tEnv.explainSql(
1358-
"select * from partitioned_table_complex where a = 3\n"
1359-
+ " and (c in ('2026') or d like '%1%') "
1360-
+ " and b like '%v3%'");
1338+
String query =
1339+
"select * from partitioned_table_complex where a = 3\n"
1340+
+ " and (c in ('2026') or d like '%1%') "
1341+
+ " and b like '%v3%'";
1342+
String plan = tEnv.explainSql(query);
13611343
assertThat(plan)
13621344
.contains(
13631345
"Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND LIKE(b, '%v3%'))])\n"
13641346
+ "+- TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_complex, filter=[OR(=(c, _UTF-16LE'2026'), LIKE(d, _UTF-16LE'%1%'))]]], fields=[a, b, c, d])");
13651347

1366-
org.apache.flink.util.CloseableIterator<Row> rowIter =
1367-
tEnv.executeSql(
1368-
"select * from partitioned_table_complex where a = 3\n"
1369-
+ " and (c in ('2026') or d like '%1%') "
1370-
+ " and b like '%v3%'")
1371-
.collect();
1348+
org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();
13721349

13731350
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
13741351
}

0 commit comments

Comments
 (0)