1717
1818package com .alibaba .fluss .flink .source ;
1919
20+ import com .alibaba .fluss .client .Connection ;
21+ import com .alibaba .fluss .client .ConnectionFactory ;
2022import com .alibaba .fluss .client .admin .Admin ;
2123import com .alibaba .fluss .client .metadata .KvSnapshots ;
2224import com .alibaba .fluss .client .table .Table ;
2325import com .alibaba .fluss .client .table .writer .UpsertWriter ;
2426import com .alibaba .fluss .config .ConfigOptions ;
25- import com .alibaba .fluss .flink . utils . FlinkTestBase ;
27+ import com .alibaba .fluss .config . Configuration ;
2628import com .alibaba .fluss .metadata .TablePath ;
2729import com .alibaba .fluss .row .GenericRow ;
2830import com .alibaba .fluss .row .InternalRow ;
29- import com .alibaba .fluss .types .RowType ;
31+ import com .alibaba .fluss .server .testutils .FlussClusterExtension ;
32+ import com .alibaba .fluss .utils .clock .ManualClock ;
3033
3134import org .apache .commons .lang3 .RandomUtils ;
3235import org .apache .flink .api .common .typeinfo .TypeInformation ;
3942import org .apache .flink .table .api .Schema ;
4043import org .apache .flink .table .api .bridge .java .StreamTableEnvironment ;
4144import org .apache .flink .table .api .config .ExecutionConfigOptions ;
45+ import org .apache .flink .test .util .AbstractTestBase ;
4246import org .apache .flink .types .Row ;
4347import org .apache .flink .util .CloseableIterator ;
4448import org .junit .jupiter .api .AfterEach ;
49+ import org .junit .jupiter .api .BeforeAll ;
4550import org .junit .jupiter .api .BeforeEach ;
4651import org .junit .jupiter .api .Test ;
52+ import org .junit .jupiter .api .extension .RegisterExtension ;
4753import org .junit .jupiter .params .ParameterizedTest ;
4854import org .junit .jupiter .params .provider .Arguments ;
4955import org .junit .jupiter .params .provider .MethodSource ;
6571
6672import static com .alibaba .fluss .flink .FlinkConnectorOptions .BOOTSTRAP_SERVERS ;
6773import static com .alibaba .fluss .flink .source .testutils .FlinkRowAssertionsUtils .assertResultsIgnoreOrder ;
74+ import static com .alibaba .fluss .flink .utils .FlinkTestBase .waitUntilPartitions ;
75+ import static com .alibaba .fluss .flink .utils .FlinkTestBase .writeRows ;
76+ import static com .alibaba .fluss .flink .utils .FlinkTestBase .writeRowsToPartition ;
6877import static com .alibaba .fluss .server .testutils .FlussClusterExtension .BUILTIN_DATABASE ;
6978import static com .alibaba .fluss .testutils .DataTestUtils .row ;
7079import static com .alibaba .fluss .testutils .common .CommonTestUtils .waitUtil ;
7180import static org .assertj .core .api .Assertions .assertThat ;
7281import static org .assertj .core .api .Assertions .assertThatThrownBy ;
7382
7483/** IT case for using flink sql to read fluss table. */
75- abstract class FlinkTableSourceITCase extends FlinkTestBase {
84+ abstract class FlinkTableSourceITCase extends AbstractTestBase {
85+ protected static final ManualClock CLOCK = new ManualClock ();
86+
87+ @ RegisterExtension
88+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
89+ FlussClusterExtension .builder ()
90+ .setClusterConf (
91+ new Configuration ()
92+ // set snapshot interval to 1s for testing purposes
93+ .set (ConfigOptions .KV_SNAPSHOT_INTERVAL , Duration .ofSeconds (1 ))
94+ // not to clean snapshots for test purpose
95+ .set (
96+ ConfigOptions .KV_MAX_RETAINED_SNAPSHOTS ,
97+ Integer .MAX_VALUE ))
98+ .setNumOfTabletServers (3 )
99+ .setClock (CLOCK )
100+ .build ();
76101
77102 static final String CATALOG_NAME = "testcatalog" ;
78103 static final String DEFAULT_DB = "defaultdb" ;
79104 protected StreamExecutionEnvironment execEnv ;
80105 protected StreamTableEnvironment tEnv ;
106+ protected static Connection conn ;
107+ protected static Admin admin ;
108+
109+ protected static Configuration clientConf ;
110+ protected static String bootstrapServers ;
111+
112+ @ BeforeAll
113+ protected static void beforeAll () {
114+ clientConf = FLUSS_CLUSTER_EXTENSION .getClientConfig ();
115+ bootstrapServers = FLUSS_CLUSTER_EXTENSION .getBootstrapServers ();
116+ conn = ConnectionFactory .createConnection (clientConf );
117+ admin = conn .getAdmin ();
118+ }
81119
82120 @ BeforeEach
83121 void before () {
@@ -120,7 +158,7 @@ public void testCreateTableLike() throws Exception {
120158 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
121159
122160 // write records
123- writeRows (tablePath , rows , false );
161+ writeRows (conn , tablePath , rows , false );
124162
125163 waitUtilAllBucketFinishSnapshot (admin , tablePath );
126164
@@ -139,7 +177,7 @@ void testPkTableReadOnlySnapshot() throws Exception {
139177 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
140178
141179 // write records
142- writeRows (tablePath , rows , false );
180+ writeRows (conn , tablePath , rows , false );
143181
144182 waitUtilAllBucketFinishSnapshot (admin , tablePath );
145183
@@ -164,7 +202,7 @@ void testNonPkTableRead() throws Exception {
164202 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
165203
166204 // write records
167- writeRows (tablePath , rows , true );
205+ writeRows (conn , tablePath , rows , true );
168206
169207 List <String > expected = Arrays .asList ("+I[1, v1]" , "+I[2, v2]" , "+I[3, v3]" );
170208 try (org .apache .flink .util .CloseableIterator <Row > rowIter =
@@ -201,7 +239,7 @@ void testAppendTableProjectPushDown(String logFormat) throws Exception {
201239 row (8 , "v8" , 800L , 8000 , 800 , 8000L ),
202240 row (9 , "v9" , 900L , 9000 , 900 , 9000L ),
203241 row (10 , "v10" , 1000L , 10000 , 1000 , 10000L ));
204- writeRows (tablePath , rows , true );
242+ writeRows (conn , tablePath , rows , true );
205243
206244 // projection + reorder.
207245 String query = "select b, d, c from " + tableName ;
@@ -266,11 +304,11 @@ void testTableProjectPushDown(String mode) throws Exception {
266304 if (!testPkLog ) {
267305 // write records and wait snapshot before collect job start,
268306 // to make sure reading from kv snapshot
269- writeRows (tablePath , rows , false );
307+ writeRows (conn , tablePath , rows , false );
270308 waitUtilAllBucketFinishSnapshot (admin , TablePath .of (DEFAULT_DB , tableName ));
271309 }
272310 } else {
273- writeRows (tablePath , rows , true );
311+ writeRows (conn , tablePath , rows , true );
274312 }
275313
276314 String query = "select b, a, c from " + tableName ;
@@ -300,7 +338,7 @@ void testTableProjectPushDown(String mode) throws Exception {
300338 if (testPkLog ) {
301339 // delay the write after collect job start,
302340 // to make sure reading from log instead of snapshot
303- writeRows (tablePath , rows , false );
341+ writeRows (conn , tablePath , rows , false );
304342 }
305343 for (int i = 0 ; i < expectRecords ; i ++) {
306344 Row r = rowIter .next ();
@@ -320,7 +358,7 @@ void testPkTableReadMixSnapshotAndLog() throws Exception {
320358 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
321359
322360 // write records
323- writeRows (tablePath , rows , false );
361+ writeRows (conn , tablePath , rows , false );
324362
325363 waitUtilAllBucketFinishSnapshot (admin , tablePath );
326364
@@ -339,7 +377,7 @@ void testPkTableReadMixSnapshotAndLog() throws Exception {
339377 "+U[2, v2]" ,
340378 "-U[3, v3]" ,
341379 "+U[3, v3]" );
342- writeRows (tablePath , rows , false );
380+ writeRows (conn , tablePath , rows , false );
343381 assertResultsIgnoreOrder (rowIter , expectedRows , true );
344382 }
345383
@@ -384,7 +422,9 @@ void testReadLogTableWithDifferentScanStartupMode(boolean isPartitioned) throws
384422 rowWithPartition (new Object [] {4 , "v4" , 400L , 4000 }, partitionName ),
385423 rowWithPartition (new Object [] {5 , "v5" , 500L , 5000 }, partitionName ));
386424
387- writeRows (tablePath , rows1 , true );
425+ writeRows (conn , tablePath , rows1 , true );
426+ CLOCK .advanceTime (Duration .ofMillis (100L ));
427+ long timestamp = CLOCK .milliseconds ();
388428
389429 List <InternalRow > rows2 =
390430 Arrays .asList (
@@ -394,7 +434,7 @@ void testReadLogTableWithDifferentScanStartupMode(boolean isPartitioned) throws
394434 rowWithPartition (new Object [] {9 , "v9" , 900L , 9000 }, partitionName ),
395435 rowWithPartition (new Object [] {10 , "v10" , 1000L , 10000 }, partitionName ));
396436 // for second batch, we don't wait snapshot finish.
397- writeRows (tablePath , rows2 , true );
437+ writeRows (conn , tablePath , rows2 , true );
398438
399439 // 1. read log table with scan.startup.mode='full'
400440 String options = " /*+ OPTIONS('scan.startup.mode' = 'full') */" ;
@@ -419,10 +459,17 @@ void testReadLogTableWithDifferentScanStartupMode(boolean isPartitioned) throws
419459 assertQueryResult (query , expected );
420460
421461 // 3. read log table with scan.startup.mode='timestamp'
462+ expected =
463+ Arrays .asList (
464+ "+I[6, v6, 600, 6000]" ,
465+ "+I[7, v7, 700, 7000]" ,
466+ "+I[8, v8, 800, 8000]" ,
467+ "+I[9, v9, 900, 9000]" ,
468+ "+I[10, v10, 1000, 10000]" );
422469 options =
423470 String .format (
424471 " /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' ='%d') */" ,
425- 1000 );
472+ timestamp );
426473 query = "select a, b, c, d from " + tableName + options ;
427474 assertQueryResult (query , expected );
428475 }
@@ -437,7 +484,7 @@ void testReadKvTableWithScanStartupModeEqualsFull() throws Exception {
437484 Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ), row (3 , "v33" ));
438485
439486 // write records and wait generate snapshot.
440- writeRows (tablePath , rows1 , false );
487+ writeRows (conn , tablePath , rows1 , false );
441488 waitUtilAllBucketFinishSnapshot (admin , tablePath );
442489
443490 List <InternalRow > rows2 = Arrays .asList (row (1 , "v11" ), row (2 , "v22" ), row (4 , "v4" ));
@@ -460,7 +507,7 @@ void testReadKvTableWithScanStartupModeEqualsFull() throws Exception {
460507 List <String > actual = new ArrayList <>(expectRecords );
461508 // delay to write after collect job start, to make sure reading from log instead of
462509 // snapshot
463- writeRows (tablePath , rows2 , false );
510+ writeRows (conn , tablePath , rows2 , false );
464511 for (int i = 0 ; i < expectRecords ; i ++) {
465512 Row r = rowIter .next ();
466513 String row = r .toString ();
@@ -482,6 +529,7 @@ private static Stream<Arguments> readKvTableScanStartupModeArgs() {
482529 @ MethodSource ("readKvTableScanStartupModeArgs" )
483530 void testReadKvTableWithEarliestAndTimestampScanStartupMode (String mode , boolean isPartitioned )
484531 throws Exception {
532+ long timestamp = CLOCK .milliseconds ();
485533 String tableName = mode + "_test_" + (isPartitioned ? "partitioned" : "non_partitioned" );
486534 TablePath tablePath = TablePath .of (DEFAULT_DB , tableName );
487535 String partitionName = null ;
@@ -505,8 +553,6 @@ void testReadKvTableWithEarliestAndTimestampScanStartupMode(String mode, boolean
505553 partitionName = partitionNameById .values ().iterator ().next ();
506554 }
507555
508- RowType dataType = conn .getTable (tablePath ).getTableInfo ().getRowType ();
509-
510556 List <InternalRow > rows1 =
511557 Arrays .asList (
512558 rowWithPartition (new Object [] {1 , "v1" }, partitionName ),
@@ -515,24 +561,26 @@ void testReadKvTableWithEarliestAndTimestampScanStartupMode(String mode, boolean
515561 rowWithPartition (new Object [] {3 , "v33" }, partitionName ));
516562
517563 // write records and wait generate snapshot.
518- writeRows (tablePath , rows1 , false );
564+ writeRows (conn , tablePath , rows1 , false );
519565 if (partitionName == null ) {
520566 waitUtilAllBucketFinishSnapshot (admin , tablePath );
521567 } else {
522568 waitUtilAllBucketFinishSnapshot (admin , tablePath , Collections .singleton (partitionName ));
523569 }
570+ CLOCK .advanceTime (Duration .ofMillis (100 ));
524571
525572 List <InternalRow > rows2 =
526573 Arrays .asList (
527574 rowWithPartition (new Object [] {1 , "v11" }, partitionName ),
528575 rowWithPartition (new Object [] {2 , "v22" }, partitionName ),
529576 rowWithPartition (new Object [] {4 , "v4" }, partitionName ));
530- writeRows (tablePath , rows2 , false );
577+ writeRows (conn , tablePath , rows2 , false );
578+ CLOCK .advanceTime (Duration .ofMillis (100 ));
531579
532580 String options =
533581 String .format (
534- " /*+ OPTIONS('scan.startup.mode' = '%s', 'scan.startup.timestamp' = '1000 ') */" ,
535- mode );
582+ " /*+ OPTIONS('scan.startup.mode' = '%s', 'scan.startup.timestamp' = '%s ') */" ,
583+ mode , timestamp );
536584 String query = "select a, b from " + tableName + options ;
537585 List <String > expected =
538586 Arrays .asList (
@@ -596,7 +644,7 @@ void testReadPrimaryKeyPartitionedTable(boolean isAutoPartition) throws Exceptio
596644 }
597645
598646 List <String > expectedRowValues =
599- writeRowsToPartition (tablePath , partitionNameById .values ());
647+ writeRowsToPartition (conn , tablePath , partitionNameById .values ());
600648 waitUtilAllBucketFinishSnapshot (admin , tablePath , partitionNameById .values ());
601649
602650 org .apache .flink .util .CloseableIterator <Row > rowIter =
@@ -607,7 +655,7 @@ void testReadPrimaryKeyPartitionedTable(boolean isAutoPartition) throws Exceptio
607655 tEnv .executeSql (String .format ("alter table %s add partition (c = '2000')" , tableName ));
608656 tEnv .executeSql (String .format ("alter table %s add partition (c = '2001')" , tableName ));
609657 // write data to the new partitions
610- expectedRowValues = writeRowsToPartition (tablePath , Arrays .asList ("2000" , "2001" ));
658+ expectedRowValues = writeRowsToPartition (conn , tablePath , Arrays .asList ("2000" , "2001" ));
611659 assertResultsIgnoreOrder (rowIter , expectedRowValues , true );
612660 }
613661
@@ -619,10 +667,10 @@ void testReadTimestampGreaterThanMaxTimestamp() throws Exception {
619667 // write first bath records
620668 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
621669
622- writeRows (tablePath , rows , true );
623- Thread . sleep ( 100 );
670+ writeRows (conn , tablePath , rows , true );
671+ CLOCK . advanceTime ( Duration . ofMillis ( 100L ) );
624672 // startup time between write first and second batch records.
625- long currentTimeMillis = System . currentTimeMillis ();
673+ long currentTimeMillis = CLOCK . milliseconds ();
626674
627675 // startup timestamp is larger than current time.
628676 assertThatThrownBy (
@@ -644,10 +692,10 @@ void testReadTimestampGreaterThanMaxTimestamp() throws Exception {
644692 "select * from timestamp_table /*+ OPTIONS('scan.startup.mode' = 'timestamp', 'scan.startup.timestamp' = '%s') */ " ,
645693 currentTimeMillis ))
646694 .collect ()) {
647- Thread . sleep ( 100 );
695+ CLOCK . advanceTime ( Duration . ofMillis ( 100L ) );
648696 // write second batch record.
649697 rows = Arrays .asList (row (4 , "v4" ), row (5 , "v5" ), row (6 , "v6" ));
650- writeRows (tablePath , rows , true );
698+ writeRows (conn , tablePath , rows , true );
651699 List <String > expected = Arrays .asList ("+I[4, v4]" , "+I[5, v5]" , "+I[6, v6]" );
652700 int expectRecords = expected .size ();
653701 List <String > actual = new ArrayList <>(expectRecords );
@@ -940,7 +988,7 @@ void testStreamingReadSinglePartitionPushDown() throws Exception {
940988 tEnv .executeSql ("alter table partitioned_table add partition (c=2026)" );
941989
942990 List <String > expectedRowValues =
943- writeRowsToPartition (tablePath , Arrays .asList ("2025" , "2026" )).stream ()
991+ writeRowsToPartition (conn , tablePath , Arrays .asList ("2025" , "2026" )).stream ()
944992 .filter (s -> s .contains ("2025" ))
945993 .collect (Collectors .toList ());
946994 waitUtilAllBucketFinishSnapshot (admin , tablePath , Arrays .asList ("2025" , "2026" ));
@@ -1035,13 +1083,13 @@ void testStreamingReadWithCombinedFilters() throws Exception {
10351083 expectedRowValues .add (String .format ("+I[%d, 2025, %d]" , i , i * 100 ));
10361084 }
10371085 }
1038- writeRows (tablePath , rows , false );
1086+ writeRows (conn , tablePath , rows , false );
10391087
10401088 for (int i = 0 ; i < 10 ; i ++) {
10411089 rows .add (row (i , "v" + i , "2026" , i * 100 ));
10421090 }
10431091
1044- writeRows (tablePath , rows , false );
1092+ writeRows (conn , tablePath , rows , false );
10451093 waitUtilAllBucketFinishSnapshot (admin , tablePath , Arrays .asList ("2025" , "2026" ));
10461094
10471095 String plan =
@@ -1072,7 +1120,7 @@ void testNonPartitionPushDown() throws Exception {
10721120 tEnv .executeSql ("alter table partitioned_table_no_filter add partition (c=2026)" );
10731121
10741122 List <String > expectedRowValues =
1075- writeRowsToPartition (tablePath , Arrays .asList ("2025" , "2026" ));
1123+ writeRowsToPartition (conn , tablePath , Arrays .asList ("2025" , "2026" ));
10761124 waitUtilAllBucketFinishSnapshot (admin , tablePath , Arrays .asList ("2025" , "2026" ));
10771125
10781126 org .apache .flink .util .CloseableIterator <Row > rowIter =
@@ -1097,7 +1145,7 @@ private List<String> writeRowsToTwoPartition(TablePath tablePath, Collection<Str
10971145 }
10981146 }
10991147
1100- writeRows (tablePath , rows , false );
1148+ writeRows (conn , tablePath , rows , false );
11011149
11021150 return expectedRowValues ;
11031151 }
0 commit comments