Skip to content

Commit a2f7ed1

Browse files
committed
[flink]some fix
1 parent 32decb5 commit a2f7ed1

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,7 @@ public DynamicTableSource copy() {
369369
source.projectedFields = projectedFields;
370370
source.singleRowFilter = singleRowFilter;
371371
source.modificationScanType = modificationScanType;
372+
source.predicate = predicate;
372373
return source;
373374
}
374375

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1179,6 +1179,76 @@ void testStreamingReadPartitionPushDownWithInExpr() throws Exception {
11791179
.collect();
11801180

11811181
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
1182+
1183+
plan = tEnv.explainSql("select * from partitioned_table_in where c ='2025' or c ='2026'");
1184+
assertThat(plan)
1185+
.contains(
1186+
"TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_in, filter=[OR(=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"), =(c, _UTF-16LE'2026':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], fields=[a, b, c])");
1187+
1188+
rowIter =
1189+
tEnv.executeSql("select * from partitioned_table_in where c ='2025' or c ='2026'")
1190+
.collect();
1191+
1192+
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
1193+
}
1194+
1195+
@Test
1196+
void testStreamingReadWithCombinedFiltersAndInExpr() throws Exception {
1197+
tEnv.executeSql(
1198+
"create table combined_filters_table_in"
1199+
+ " (a int not null, b varchar, c string, d int, primary key (a, c) NOT ENFORCED) partitioned by (c) ");
1200+
TablePath tablePath = TablePath.of(DEFAULT_DB, "combined_filters_table_in");
1201+
tEnv.executeSql("alter table combined_filters_table_in add partition (c=2025)");
1202+
tEnv.executeSql("alter table combined_filters_table_in add partition (c=2026)");
1203+
tEnv.executeSql("alter table combined_filters_table_in add partition (c=2027)");
1204+
1205+
List<InternalRow> rows = new ArrayList<>();
1206+
List<String> expectedRowValues = new ArrayList<>();
1207+
1208+
for (int i = 0; i < 10; i++) {
1209+
rows.add(row(i, "v" + i, "2025", i * 100));
1210+
if (i % 2 == 0) {
1211+
expectedRowValues.add(String.format("+I[%d, 2025, %d]", i, i * 100));
1212+
}
1213+
}
1214+
for (int i = 0; i < 10; i++) {
1215+
rows.add(row(i, "v" + i, "2026", i * 100));
1216+
if (i % 2 == 0) {
1217+
expectedRowValues.add(String.format("+I[%d, 2026, %d]", i, i * 100));
1218+
}
1219+
}
1220+
writeRows(conn, tablePath, rows, false);
1221+
1222+
for (int i = 0; i < 10; i++) {
1223+
rows.add(row(i, "v" + i, "2027", i * 100));
1224+
}
1225+
1226+
writeRows(conn, tablePath, rows, false);
1227+
waitUtilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026", "2027"));
1228+
1229+
String plan =
1230+
tEnv.explainSql(
1231+
"select a,c,d from combined_filters_table_in where c in ('2025','2026') and d % 200 = 0");
1232+
assertThat(plan)
1233+
.contains(
1234+
"TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table_in, filter=[OR(=(c, _UTF-16LE'2025'), =(c, _UTF-16LE'2026'))], project=[a, c, d]]], fields=[a, c, d])");
1235+
1236+
// test column filter、partition filter and flink runtime filter
1237+
org.apache.flink.util.CloseableIterator<Row> rowIter =
1238+
tEnv.executeSql(
1239+
"select a,c,d from combined_filters_table_in where c in ('2025','2026') "
1240+
+ "and d % 200 = 0")
1241+
.collect();
1242+
1243+
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
1244+
1245+
rowIter =
1246+
tEnv.executeSql(
1247+
"select a,c,d from combined_filters_table_in where (c ='2025' or c ='2026') "
1248+
+ "and d % 200 = 0")
1249+
.collect();
1250+
1251+
assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
11821252
}
11831253

11841254
private enum Caching {

0 commit comments

Comments
 (0)