|
17 | 17 |
|
18 | 18 | package org.apache.fluss.flink.source; |
19 | 19 |
|
20 | | -import org.apache.flink.types.Row; |
21 | | -import org.apache.flink.util.CloseableIterator; |
22 | | -import org.junit.jupiter.params.ParameterizedTest; |
23 | | -import org.junit.jupiter.params.provider.ValueSource; |
24 | | - |
25 | | -import java.util.Collections; |
26 | | -import java.util.List; |
27 | | - |
28 | | -import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.collectRowsWithTimeout; |
29 | | -import static org.assertj.core.api.Assertions.assertThat; |
30 | | -import static org.assertj.core.api.Assertions.assertThatThrownBy; |
31 | | - |
32 | 20 | /** IT case for batch source in Flink 2.2. */ |
33 | | -public class Flink22TableSourceBatchITCase extends FlinkTableSourceBatchITCase { |
34 | | - |
35 | | - @ParameterizedTest |
36 | | - @ValueSource(booleans = {true, false}) |
37 | | - @Override |
38 | | - void testCountPushDown(boolean partitionTable) throws Exception { |
39 | | - String tableName = partitionTable ? preparePartitionedLogTable() : prepareLogTable(); |
40 | | - int expectedRows = partitionTable ? 10 : 5; |
41 | | - // normal scan |
42 | | - String query = String.format("SELECT COUNT(*) FROM %s", tableName); |
43 | | - assertThat(tEnv.explainSql(query)) |
44 | | - .contains( |
45 | | - String.format( |
46 | | - "TableSourceScan(table=[[testcatalog, defaultdb, %s, " |
47 | | - + "aggregates=[grouping=[], aggFunctions=[Count1AggFunction()]]]], " |
48 | | - + "fields=[count1$0])", |
49 | | - tableName)); |
50 | | - CloseableIterator<Row> iterRows = tEnv.executeSql(query).collect(); |
51 | | - List<String> collected = collectRowsWithTimeout(iterRows, 1); |
52 | | - List<String> expected = Collections.singletonList(String.format("+I[%s]", expectedRows)); |
53 | | - assertThat(collected).isEqualTo(expected); |
54 | | - |
55 | | - // test not push down grouping count. |
56 | | - assertThatThrownBy( |
57 | | - () -> |
58 | | - tEnv.explainSql( |
59 | | - String.format( |
60 | | - "SELECT COUNT(*) FROM %s group by id", |
61 | | - tableName)) |
62 | | - .wait()) |
63 | | - .hasMessageContaining( |
64 | | - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); |
65 | | - |
66 | | - // test not support primary key now |
67 | | - String primaryTableName = prepareSourceTable(new String[] {"id"}, null); |
68 | | - assertThatThrownBy( |
69 | | - () -> |
70 | | - tEnv.explainSql( |
71 | | - String.format( |
72 | | - "SELECT COUNT(*) FROM %s ", |
73 | | - primaryTableName)) |
74 | | - .wait()) |
75 | | - .hasMessageContaining( |
76 | | - "Currently, Fluss only support queries on table with datalake enabled or point queries on primary key when it's in batch execution mode."); |
77 | | - } |
78 | | -} |
| 21 | +public class Flink22TableSourceBatchITCase extends FlinkTableSourceBatchITCase {} |
0 commit comments