Skip to content

Commit 969fc7c

Browse files
committed
[flink]Support partition pushdown(only equals) in Flink connector
1 parent 3c1d440 commit 969fc7c

File tree

2 files changed

+5
-8
lines changed

2 files changed

+5
-8
lines changed

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -938,18 +938,16 @@ void testStreamingReadSinglePartitionPushDown() throws Exception {
938938

939939
tEnv.executeSql(
940940
"create table partitioned_table"
941-
+ " (a int not null, b varchar, c string, primary key (a, c) NOT ENFORCED) partitioned by (c) "
942-
+ "with ('table.auto-partition.enabled' = 'true', 'table.auto-partition.time-unit' = 'year')");
941+
+ " (a int not null, b varchar, c string, primary key (a, c) NOT ENFORCED) partitioned by (c) ");
943942
TablePath tablePath = TablePath.of(DEFAULT_DB, "partitioned_table");
943+
tEnv.executeSql("alter table partitioned_table add partition (c=2025)");
944+
tEnv.executeSql("alter table partitioned_table add partition (c=2026)");
944945

945-
// write data into partitions and wait snapshot is done
946-
Map<Long, String> partitionNameById =
947-
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
948946
List<String> expectedRowValues =
949-
writeRowsToPartition(tablePath, partitionNameById.values()).stream()
947+
writeRowsToPartition(tablePath, Arrays.asList("2025", "2026")).stream()
950948
.filter(s -> s.contains("2025"))
951949
.collect(Collectors.toList());
952-
waitUtilAllBucketFinishSnapshot(admin, tablePath, partitionNameById.values());
950+
waitUtilAllBucketFinishSnapshot(admin, tablePath, Arrays.asList("2025", "2026"));
953951

954952
org.apache.flink.util.CloseableIterator<Row> rowIter =
955953
tEnv.executeSql("select * from partitioned_table where c ='2025'").collect();

fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/testutils/FlinkTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,6 @@ protected List<String> writeRowsToTwoPartition(
282282
}
283283
}
284284

285-
// 写入记录
286285
writeRows(tablePath, rows, false);
287286

288287
return expectedRowValues;

0 commit comments

Comments
 (0)