Skip to content

Commit 50cd9ff

Browse files
authored
[flink] Always return all filters to flink when trying to apply filters in FlinkTableSource (#1934)
1 parent d1ae5b5 commit 50cd9ff

File tree

3 files changed

+146
-24
lines changed

3 files changed

+146
-24
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/FlinkTableSource.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -536,7 +536,11 @@ && hasPrimaryKey()
536536
return Result.of(Collections.emptyList(), filters);
537537
}
538538
singleRowFilter = lookupRow;
539-
return Result.of(acceptedFilters, remainingFilters);
539+
540+
// FLINK-38635 We cannot determine whether this source will ultimately be used as a scan
541+
// source or a lookup source. Since fluss lookup sources cannot accept filters yet, to
542+
// be safe, we return all filters to the Flink planner.
543+
return Result.of(acceptedFilters, filters);
540544
} else if (isPartitioned()) {
541545
// apply partition filter pushdown
542546
List<Predicate> converted = new ArrayList<>();
@@ -588,7 +592,11 @@ && hasPrimaryKey()
588592
}
589593
}
590594
}
591-
return Result.of(acceptedFilters, remainingFilters);
595+
596+
// FLINK-38635 We cannot determine whether this source will ultimately be used as a scan
597+
// source or a lookup source. Since fluss lookup sources cannot accept filters yet, to
598+
// be safe, we return all filters to the Flink planner.
599+
return Result.of(acceptedFilters, filters);
592600
}
593601

594602
return Result.of(Collections.emptyList(), filters);

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceBatchITCase.java

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ void testScanSingleRowFilter() throws Exception {
9191
.contains(
9292
String.format(
9393
"TableSourceScan(table=[[testcatalog, defaultdb, %s, "
94-
+ "filter=[and(=(id, 1), =(name, _UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
95-
+ "project=[address]]], fields=[address])",
94+
+ "filter=[and(=(id, 1), =(name, _UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
95+
+ "fields=[id, address, name])",
9696
tableName));
9797
CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
9898
List<String> expected = Collections.singletonList("+I[1, address1, name1]");
@@ -108,8 +108,8 @@ void testScanSingleRowFilter2() throws Exception {
108108
.contains(
109109
String.format(
110110
"TableSourceScan(table=[[testcatalog, defaultdb, %s, "
111-
+ "filter=[and(=(id, 1), =(name, _UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
112-
+ "project=[address]]], fields=[address])",
111+
+ "filter=[and(=(id, 1), =(name, _UTF-16LE'name1':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
112+
+ "fields=[id, address, name])",
113113
tableName));
114114
CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
115115
List<String> expected = Collections.singletonList("+I[1, address1, name1]");
@@ -126,7 +126,7 @@ void testScanSingleRowFilter3() throws Exception {
126126
String.format(
127127
"TableSourceScan(table=[[testcatalog, defaultdb, %s, "
128128
+ "filter=[=(id, 1)], "
129-
+ "project=[name]]], fields=[name])",
129+
+ "project=[id, name]]], fields=[id, name])",
130130
tableName));
131131
CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
132132
List<String> expected = Collections.singletonList("+I[1, name1]");
@@ -149,8 +149,8 @@ void testScanSingleRowFilterOnPartitionedTable() throws Exception {
149149
.contains(
150150
String.format(
151151
"TableSourceScan(table=[[testcatalog, defaultdb, %s, "
152-
+ "filter=[and(=(id, 1), =(dt, _UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
153-
+ "project=[address, name]]], fields=[address, name])\n",
152+
+ "filter=[and(=(id, 1), =(dt, _UTF-16LE'%s':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
153+
+ "fields=[id, address, name, dt])\n",
154154
tableName, partition1));
155155

156156
CloseableIterator<Row> collected = tEnv.executeSql(query).collect();
@@ -159,6 +159,84 @@ void testScanSingleRowFilterOnPartitionedTable() throws Exception {
159159
assertResultsIgnoreOrder(collected, expected, true);
160160
}
161161

162+
@Test
163+
void testFilterOnLookupSource() throws Exception {
164+
String srcTableName = String.format("test_src_table_%s", RandomUtils.nextInt());
165+
tEnv.executeSql(
166+
String.format(
167+
"create table %s ("
168+
+ " id int not null,"
169+
+ " name varchar,"
170+
+ " dt varchar,"
171+
+ " dim_dt varchar,"
172+
+ " primary key (id, dt) NOT ENFORCED) partitioned by (dt)"
173+
+ " with ("
174+
+ " 'bucket.num' = '4', "
175+
+ " 'table.auto-partition.enabled' = 'true',"
176+
+ " 'table.auto-partition.time-unit' = 'year')",
177+
srcTableName));
178+
179+
String dimTableName = String.format("test_dim_table_%s", RandomUtils.nextInt());
180+
tEnv.executeSql(
181+
String.format(
182+
"create table %s ("
183+
+ " id int not null,"
184+
+ " address varchar,"
185+
+ " dt varchar,"
186+
+ " primary key (id, dt) NOT ENFORCED) partitioned by (dt)"
187+
+ " with ("
188+
+ " 'bucket.num' = '4', "
189+
+ " 'table.auto-partition.enabled' = 'true',"
190+
+ " 'table.auto-partition.time-unit' = 'year')",
191+
dimTableName));
192+
193+
TablePath srcTablePath = TablePath.of(DEFAULT_DB, srcTableName);
194+
Map<Long, String> partitionNameById =
195+
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), srcTablePath);
196+
// just pick first partition to insert data
197+
Iterator<String> partitionIterator =
198+
partitionNameById.values().stream().sorted().iterator();
199+
String partition1 = partitionIterator.next();
200+
201+
// prepare src table data
202+
try (Table srcTable = conn.getTable(srcTablePath)) {
203+
UpsertWriter upsertWriter = srcTable.newUpsert().createWriter();
204+
for (int i = 1; i <= 2; i++) {
205+
Object[] values = new Object[] {i, "name" + i, partition1, partition1};
206+
upsertWriter.upsert(row(values));
207+
}
208+
upsertWriter.flush();
209+
}
210+
211+
TablePath dimTablePath = TablePath.of(DEFAULT_DB, dimTableName);
212+
// prepare dim table data
213+
try (Table dimTable = conn.getTable(dimTablePath)) {
214+
UpsertWriter upsertWriter = dimTable.newUpsert().createWriter();
215+
for (int i = 1; i <= 2; i++) {
216+
Object[] values = new Object[] {i, "address" + i, partition1};
217+
upsertWriter.upsert(row(values));
218+
}
219+
upsertWriter.flush();
220+
}
221+
222+
tEnv.executeSql(
223+
String.format(
224+
"CREATE TEMPORARY VIEW my_view AS "
225+
+ "SELECT *, proctime() as proc from %s WHERE id = 1 AND dt = '%s'",
226+
srcTableName, partition1));
227+
228+
CloseableIterator<Row> collected =
229+
tEnv.executeSql(
230+
String.format(
231+
"SELECT src.id, src.name, h.id, h.address FROM my_view src "
232+
+ " LEFT JOIN %s FOR SYSTEM_TIME AS OF src.proc as h "
233+
+ " ON src.id = h.id and src.dim_dt = h.dt and h.dt <> '%s'",
234+
dimTableName, partition1))
235+
.collect();
236+
List<String> expected = Collections.singletonList("+I[1, name1, null, null]");
237+
assertResultsIgnoreOrder(collected, expected, true);
238+
}
239+
162240
@Test
163241
void testScanSingleRowFilterException() throws Exception {
164242
String tableName = prepareSourceTable(new String[] {"id", "name"}, null);
@@ -356,8 +434,8 @@ private String prepareSourceTable(String[] keys, String partitionedKey) throws E
356434
}
357435

358436
// prepare table data
359-
try (Table dimTable = conn.getTable(tablePath)) {
360-
UpsertWriter upsertWriter = dimTable.newUpsert().createWriter();
437+
try (Table table = conn.getTable(tablePath)) {
438+
UpsertWriter upsertWriter = table.newUpsert().createWriter();
361439
for (int i = 1; i <= 5; i++) {
362440
Object[] values =
363441
partition1 == null

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -744,6 +744,41 @@ void testLookup1PkTableWith2Conditions(Caching caching, boolean async) throws Ex
744744
assertResultsIgnoreOrder(collected2, expected2, true);
745745
}
746746

747+
/**
748+
* lookup table with one pk, two join condition and one of the join condition is constant value.
749+
*/
750+
@Test
751+
void testLookupWithFilterPushDown() throws Exception {
752+
String dim =
753+
prepareDimTableAndSourceTable(
754+
Caching.DISABLE_CACHE, false, new String[] {"id"}, null, "p_date");
755+
756+
Map<Long, String> partitionNameById =
757+
waitUntilPartitions(
758+
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
759+
TablePath.of(DEFAULT_DB, dim));
760+
761+
// pick the first partition to do filter
762+
String filteredPartition = partitionNameById.values().stream().sorted().iterator().next();
763+
764+
String dimJoinQuery =
765+
String.format(
766+
"SELECT a, h.id, h.name, h.address FROM src "
767+
+ " LEFT JOIN %s FOR SYSTEM_TIME AS OF src.proc as h "
768+
+ " ON src.a = h.id and src.p_date = h.p_date and h.p_date <> '%s'",
769+
dim, filteredPartition);
770+
771+
CloseableIterator<Row> collected = tEnv.executeSql(dimJoinQuery).collect();
772+
List<String> expected =
773+
Arrays.asList(
774+
"+I[1, null, null, null]",
775+
"+I[2, null, null, null]",
776+
"+I[3, null, null, null]",
777+
"+I[10, null, null, null]",
778+
"+I[1, null, null, null]");
779+
assertResultsIgnoreOrder(collected, expected, true);
780+
}
781+
747782
/**
748783
* lookup table with one pk, 3 join condition on dim fields, 1st for variable non-pk, 2nd for
749784
* pk, 3rd for constant value.
@@ -959,8 +994,8 @@ void testStreamingReadSinglePartitionPushDown() throws Exception {
959994
assertThat(plan)
960995
.contains(
961996
"TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table, "
962-
+ "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
963-
+ "project=[a, b]]], fields=[a, b])");
997+
+ "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]]], "
998+
+ "fields=[a, b, c])");
964999

9651000
org.apache.flink.util.CloseableIterator<Row> rowIter =
9661001
tEnv.executeSql("select * from partitioned_table where c ='2025'").collect();
@@ -1013,10 +1048,11 @@ void testStreamingReadAllPartitionTypePushDown() throws Exception {
10131048
+ "=(p_string, _UTF-16LE'hello':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")), "
10141049
+ "=(p_float, 1.25E1:FLOAT)), =(p_double, 7.88E0:DOUBLE)), =(p_date, 2025-10-12)), "
10151050
+ "=(p_time, 12:55:00)), =(p_ts_ntz, 2025-10-12 12:55:00.001:TIMESTAMP(6))), "
1016-
+ "=(p_ts_ltz, 1970-01-01 00:00:04.001:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))), NOT(p_bool))], "
1017-
+ "project=[id, p_bool, p_int]]], fields=[id, p_bool, p_int])")
1018-
// all filter conditions should be pushed down
1019-
.doesNotContain("where=");
1051+
+ "=(p_ts_ltz, 1970-01-01 00:00:04.001:TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))), NOT(p_bool))]]], "
1052+
+ "fields=[id, p_bool, p_int, p_bigint, p_bytes, p_string, p_float, p_double, p_date, p_time, p_ts_ntz, p_ts_ltz])")
1053+
// although all filter conditions are pushed down into source, they are still
1054+
// retained in the plan
1055+
.contains("where=");
10201056

10211057
List<String> expectedRowValues =
10221058
Collections.singletonList(
@@ -1050,8 +1086,8 @@ void testStreamingReadMultiPartitionPushDown() throws Exception {
10501086
assertThat(plan)
10511087
.contains(
10521088
"TableSourceScan(table=[[testcatalog, defaultdb, multi_partitioned_table, "
1053-
+ "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
1054-
+ "project=[a, b, d]]], fields=[a, b, d])");
1089+
+ "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")]]], "
1090+
+ "fields=[a, b, c, d])");
10551091

10561092
// test partition key prefix match
10571093
// This test requires dynamically discovering newly created partitions, so
@@ -1080,8 +1116,8 @@ void testStreamingReadMultiPartitionPushDown() throws Exception {
10801116
.contains(
10811117
"TableSourceScan(table=[[testcatalog, defaultdb, multi_partitioned_table, "
10821118
+ "filter=[and(=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"), "
1083-
+ "=(d, _UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))], "
1084-
+ "project=[a, b]]], fields=[a, b])");
1119+
+ "=(d, _UTF-16LE'3':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\"))]]], "
1120+
+ "fields=[a, b, c, d])");
10851121

10861122
// test all partition key match
10871123
rowIter =
@@ -1128,7 +1164,7 @@ void testStreamingReadWithCombinedFilters1() throws Exception {
11281164
.contains(
11291165
"TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table, "
11301166
+ "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
1131-
+ "project=[a, d]]], fields=[a, d])");
1167+
+ "project=[a, c, d]]], fields=[a, c, d])");
11321168

11331169
// test column filter、partition filter and flink runtime filter
11341170
org.apache.flink.util.CloseableIterator<Row> rowIter =
@@ -1145,7 +1181,7 @@ void testStreamingReadWithCombinedFilters1() throws Exception {
11451181
.contains(
11461182
"TableSourceScan(table=[[testcatalog, defaultdb, combined_filters_table, "
11471183
+ "filter=[=(c, _UTF-16LE'2025':VARCHAR(2147483647) CHARACTER SET \"UTF-16LE\")], "
1148-
+ "project=[a, d]]], fields=[a, d])");
1184+
+ "project=[a, c, d]]], fields=[a, c, d])");
11491185

11501186
// test column filter、partition filter and flink runtime filter
11511187
rowIter =
@@ -1366,7 +1402,7 @@ void testStreamingReadPartitionComplexPushDown() throws Exception {
13661402
String plan = tEnv.explainSql(query);
13671403
assertThat(plan)
13681404
.contains(
1369-
"Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND LIKE(b, '%v3%'))])\n"
1405+
"Calc(select=[3 AS a, b, c, d], where=[((a = 3) AND ((c = '2026') OR LIKE(d, '%1%')) AND LIKE(b, '%v3%'))])\n"
13701406
+ "+- TableSourceScan(table=[[testcatalog, defaultdb, partitioned_table_complex, filter=[OR(=(c, _UTF-16LE'2026'), LIKE(d, _UTF-16LE'%1%'))]]], fields=[a, b, c, d])");
13711407

13721408
org.apache.flink.util.CloseableIterator<Row> rowIter = tEnv.executeSql(query).collect();

0 commit comments

Comments
 (0)