1616
1717package com .alibaba .fluss .flink .source ;
1818
19+ import com .alibaba .fluss .client .Connection ;
1920import com .alibaba .fluss .client .ConnectionFactory ;
2021import com .alibaba .fluss .client .admin .Admin ;
2122import com .alibaba .fluss .client .metadata .KvSnapshots ;
2223import com .alibaba .fluss .client .table .Table ;
2324import com .alibaba .fluss .client .table .writer .UpsertWriter ;
2425import com .alibaba .fluss .config .ConfigOptions ;
2526import com .alibaba .fluss .config .Configuration ;
26- import com .alibaba .fluss .flink .utils .FlinkTestBase ;
2727import com .alibaba .fluss .metadata .TablePath ;
2828import com .alibaba .fluss .row .GenericRow ;
2929import com .alibaba .fluss .row .InternalRow ;
4141import org .apache .flink .table .api .Schema ;
4242import org .apache .flink .table .api .bridge .java .StreamTableEnvironment ;
4343import org .apache .flink .table .api .config .ExecutionConfigOptions ;
44+ import org .apache .flink .test .util .AbstractTestBase ;
4445import org .apache .flink .types .Row ;
4546import org .apache .flink .util .CloseableIterator ;
4647import org .junit .jupiter .api .AfterEach ;
6970
7071import static com .alibaba .fluss .flink .FlinkConnectorOptions .BOOTSTRAP_SERVERS ;
7172import static com .alibaba .fluss .flink .source .testutils .FlinkRowAssertionsUtils .assertResultsIgnoreOrder ;
73+ import static com .alibaba .fluss .flink .utils .FlinkTestBase .waitUntilPartitions ;
74+ import static com .alibaba .fluss .flink .utils .FlinkTestBase .writeRows ;
75+ import static com .alibaba .fluss .flink .utils .FlinkTestBase .writeRowsToPartition ;
7276import static com .alibaba .fluss .server .testutils .FlussClusterExtension .BUILTIN_DATABASE ;
7377import static com .alibaba .fluss .testutils .DataTestUtils .row ;
7478import static com .alibaba .fluss .testutils .common .CommonTestUtils .waitUtil ;
7579import static org .assertj .core .api .Assertions .assertThat ;
7680import static org .assertj .core .api .Assertions .assertThatThrownBy ;
7781
7882/** IT case for using flink sql to read fluss table. */
79- abstract class FlinkTableSourceITCase extends FlinkTestBase {
83+ abstract class FlinkTableSourceITCase extends AbstractTestBase {
8084 protected static final ManualClock CLOCK = new ManualClock ();
8185
8286 @ RegisterExtension
@@ -98,6 +102,11 @@ abstract class FlinkTableSourceITCase extends FlinkTestBase {
98102 static final String DEFAULT_DB = "defaultdb" ;
99103 protected StreamExecutionEnvironment execEnv ;
100104 protected StreamTableEnvironment tEnv ;
105+ protected static Connection conn ;
106+ protected static Admin admin ;
107+
108+ protected static Configuration clientConf ;
109+ protected static String bootstrapServers ;
101110
102111 @ BeforeAll
103112 protected static void beforeAll () {
@@ -148,7 +157,7 @@ public void testCreateTableLike() throws Exception {
148157 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
149158
150159 // write records
151- writeRows (tablePath , rows , false );
160+ writeRows (conn , tablePath , rows , false );
152161
153162 waitUtilAllBucketFinishSnapshot (admin , tablePath );
154163
@@ -167,7 +176,7 @@ void testPkTableReadOnlySnapshot() throws Exception {
167176 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
168177
169178 // write records
170- writeRows (tablePath , rows , false );
179+ writeRows (conn , tablePath , rows , false );
171180
172181 waitUtilAllBucketFinishSnapshot (admin , tablePath );
173182
@@ -192,7 +201,7 @@ void testNonPkTableRead() throws Exception {
192201 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
193202
194203 // write records
195- writeRows (tablePath , rows , true );
204+ writeRows (conn , tablePath , rows , true );
196205
197206 List <String > expected = Arrays .asList ("+I[1, v1]" , "+I[2, v2]" , "+I[3, v3]" );
198207 try (org .apache .flink .util .CloseableIterator <Row > rowIter =
@@ -229,7 +238,7 @@ void testAppendTableProjectPushDown(String logFormat) throws Exception {
229238 row (8 , "v8" , 800L , 8000 , 800 , 8000L ),
230239 row (9 , "v9" , 900L , 9000 , 900 , 9000L ),
231240 row (10 , "v10" , 1000L , 10000 , 1000 , 10000L ));
232- writeRows (tablePath , rows , true );
241+ writeRows (conn , tablePath , rows , true );
233242
234243 // projection + reorder.
235244 String query = "select b, d, c from " + tableName ;
@@ -294,11 +303,11 @@ void testTableProjectPushDown(String mode) throws Exception {
294303 if (!testPkLog ) {
295304 // write records and wait snapshot before collect job start,
296305 // to make sure reading from kv snapshot
297- writeRows (tablePath , rows , false );
306+ writeRows (conn , tablePath , rows , false );
298307 waitUtilAllBucketFinishSnapshot (admin , TablePath .of (DEFAULT_DB , tableName ));
299308 }
300309 } else {
301- writeRows (tablePath , rows , true );
310+ writeRows (conn , tablePath , rows , true );
302311 }
303312
304313 String query = "select b, a, c from " + tableName ;
@@ -328,7 +337,7 @@ void testTableProjectPushDown(String mode) throws Exception {
328337 if (testPkLog ) {
329338 // delay the write after collect job start,
330339 // to make sure reading from log instead of snapshot
331- writeRows (tablePath , rows , false );
340+ writeRows (conn , tablePath , rows , false );
332341 }
333342 for (int i = 0 ; i < expectRecords ; i ++) {
334343 Row r = rowIter .next ();
@@ -348,7 +357,7 @@ void testPkTableReadMixSnapshotAndLog() throws Exception {
348357 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
349358
350359 // write records
351- writeRows (tablePath , rows , false );
360+ writeRows (conn , tablePath , rows , false );
352361
353362 waitUtilAllBucketFinishSnapshot (admin , tablePath );
354363
@@ -367,7 +376,7 @@ void testPkTableReadMixSnapshotAndLog() throws Exception {
367376 "+U[2, v2]" ,
368377 "-U[3, v3]" ,
369378 "+U[3, v3]" );
370- writeRows (tablePath , rows , false );
379+ writeRows (conn , tablePath , rows , false );
371380 assertResultsIgnoreOrder (rowIter , expectedRows , true );
372381 }
373382
@@ -412,7 +421,7 @@ void testReadLogTableWithDifferentScanStartupMode(boolean isPartitioned) throws
412421 rowWithPartition (new Object [] {4 , "v4" , 400L , 4000 }, partitionName ),
413422 rowWithPartition (new Object [] {5 , "v5" , 500L , 5000 }, partitionName ));
414423
415- writeRows (tablePath , rows1 , true );
424+ writeRows (conn , tablePath , rows1 , true );
416425 CLOCK .advanceTime (Duration .ofMillis (100L ));
417426 long timestamp = CLOCK .milliseconds ();
418427
@@ -424,7 +433,7 @@ void testReadLogTableWithDifferentScanStartupMode(boolean isPartitioned) throws
424433 rowWithPartition (new Object [] {9 , "v9" , 900L , 9000 }, partitionName ),
425434 rowWithPartition (new Object [] {10 , "v10" , 1000L , 10000 }, partitionName ));
426435 // for second batch, we don't wait snapshot finish.
427- writeRows (tablePath , rows2 , true );
436+ writeRows (conn , tablePath , rows2 , true );
428437
429438 // 1. read log table with scan.startup.mode='full'
430439 String options = " /*+ OPTIONS('scan.startup.mode' = 'full') */" ;
@@ -474,7 +483,7 @@ void testReadKvTableWithScanStartupModeEqualsFull() throws Exception {
474483 Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ), row (3 , "v33" ));
475484
476485 // write records and wait generate snapshot.
477- writeRows (tablePath , rows1 , false );
486+ writeRows (conn , tablePath , rows1 , false );
478487 waitUtilAllBucketFinishSnapshot (admin , tablePath );
479488
480489 List <InternalRow > rows2 = Arrays .asList (row (1 , "v11" ), row (2 , "v22" ), row (4 , "v4" ));
@@ -497,7 +506,7 @@ void testReadKvTableWithScanStartupModeEqualsFull() throws Exception {
497506 List <String > actual = new ArrayList <>(expectRecords );
498507 // delay to write after collect job start, to make sure reading from log instead of
499508 // snapshot
500- writeRows (tablePath , rows2 , false );
509+ writeRows (conn , tablePath , rows2 , false );
501510 for (int i = 0 ; i < expectRecords ; i ++) {
502511 Row r = rowIter .next ();
503512 String row = r .toString ();
@@ -551,7 +560,7 @@ void testReadKvTableWithEarliestAndTimestampScanStartupMode(String mode, boolean
551560 rowWithPartition (new Object [] {3 , "v33" }, partitionName ));
552561
553562 // write records and wait generate snapshot.
554- writeRows (tablePath , rows1 , false );
563+ writeRows (conn , tablePath , rows1 , false );
555564 if (partitionName == null ) {
556565 waitUtilAllBucketFinishSnapshot (admin , tablePath );
557566 } else {
@@ -564,7 +573,7 @@ void testReadKvTableWithEarliestAndTimestampScanStartupMode(String mode, boolean
564573 rowWithPartition (new Object [] {1 , "v11" }, partitionName ),
565574 rowWithPartition (new Object [] {2 , "v22" }, partitionName ),
566575 rowWithPartition (new Object [] {4 , "v4" }, partitionName ));
567- writeRows (tablePath , rows2 , false );
576+ writeRows (conn , tablePath , rows2 , false );
568577 CLOCK .advanceTime (Duration .ofMillis (100 ));
569578
570579 String options =
@@ -634,7 +643,7 @@ void testReadPrimaryKeyPartitionedTable(boolean isAutoPartition) throws Exceptio
634643 }
635644
636645 List <String > expectedRowValues =
637- writeRowsToPartition (tablePath , partitionNameById .values ());
646+ writeRowsToPartition (conn , tablePath , partitionNameById .values ());
638647 waitUtilAllBucketFinishSnapshot (admin , tablePath , partitionNameById .values ());
639648
640649 org .apache .flink .util .CloseableIterator <Row > rowIter =
@@ -645,7 +654,7 @@ void testReadPrimaryKeyPartitionedTable(boolean isAutoPartition) throws Exceptio
645654 tEnv .executeSql (String .format ("alter table %s add partition (c = '2000')" , tableName ));
646655 tEnv .executeSql (String .format ("alter table %s add partition (c = '2001')" , tableName ));
647656 // write data to the new partitions
648- expectedRowValues = writeRowsToPartition (tablePath , Arrays .asList ("2000" , "2001" ));
657+ expectedRowValues = writeRowsToPartition (conn , tablePath , Arrays .asList ("2000" , "2001" ));
649658 assertResultsIgnoreOrder (rowIter , expectedRowValues , true );
650659 }
651660
@@ -657,7 +666,7 @@ void testReadTimestampGreaterThanMaxTimestamp() throws Exception {
657666 // write first bath records
658667 List <InternalRow > rows = Arrays .asList (row (1 , "v1" ), row (2 , "v2" ), row (3 , "v3" ));
659668
660- writeRows (tablePath , rows , true );
669+ writeRows (conn , tablePath , rows , true );
661670 CLOCK .advanceTime (Duration .ofMillis (100L ));
662671 // startup time between write first and second batch records.
663672 long currentTimeMillis = CLOCK .milliseconds ();
@@ -685,7 +694,7 @@ void testReadTimestampGreaterThanMaxTimestamp() throws Exception {
685694 CLOCK .advanceTime (Duration .ofMillis (100L ));
686695 // write second batch record.
687696 rows = Arrays .asList (row (4 , "v4" ), row (5 , "v5" ), row (6 , "v6" ));
688- writeRows (tablePath , rows , true );
697+ writeRows (conn , tablePath , rows , true );
689698 List <String > expected = Arrays .asList ("+I[4, v4]" , "+I[5, v5]" , "+I[6, v6]" );
690699 int expectRecords = expected .size ();
691700 List <String > actual = new ArrayList <>(expectRecords );
@@ -978,7 +987,7 @@ void testStreamingReadSinglePartitionPushDown() throws Exception {
978987 tEnv .executeSql ("alter table partitioned_table add partition (c=2026)" );
979988
980989 List <String > expectedRowValues =
981- writeRowsToPartition (tablePath , Arrays .asList ("2025" , "2026" )).stream ()
990+ writeRowsToPartition (conn , tablePath , Arrays .asList ("2025" , "2026" )).stream ()
982991 .filter (s -> s .contains ("2025" ))
983992 .collect (Collectors .toList ());
984993 waitUtilAllBucketFinishSnapshot (admin , tablePath , Arrays .asList ("2025" , "2026" ));
@@ -1073,13 +1082,13 @@ void testStreamingReadWithCombinedFilters() throws Exception {
10731082 expectedRowValues .add (String .format ("+I[%d, 2025, %d]" , i , i * 100 ));
10741083 }
10751084 }
1076- writeRows (tablePath , rows , false );
1085+ writeRows (conn , tablePath , rows , false );
10771086
10781087 for (int i = 0 ; i < 10 ; i ++) {
10791088 rows .add (row (i , "v" + i , "2026" , i * 100 ));
10801089 }
10811090
1082- writeRows (tablePath , rows , false );
1091+ writeRows (conn , tablePath , rows , false );
10831092 waitUtilAllBucketFinishSnapshot (admin , tablePath , Arrays .asList ("2025" , "2026" ));
10841093
10851094 String plan =
@@ -1110,7 +1119,7 @@ void testNonPartitionPushDown() throws Exception {
11101119 tEnv .executeSql ("alter table partitioned_table_no_filter add partition (c=2026)" );
11111120
11121121 List <String > expectedRowValues =
1113- writeRowsToPartition (tablePath , Arrays .asList ("2025" , "2026" ));
1122+ writeRowsToPartition (conn , tablePath , Arrays .asList ("2025" , "2026" ));
11141123 waitUtilAllBucketFinishSnapshot (admin , tablePath , Arrays .asList ("2025" , "2026" ));
11151124
11161125 org .apache .flink .util .CloseableIterator <Row > rowIter =
@@ -1135,7 +1144,7 @@ private List<String> writeRowsToTwoPartition(TablePath tablePath, Collection<Str
11351144 }
11361145 }
11371146
1138- writeRows (tablePath , rows , false );
1147+ writeRows (conn , tablePath , rows , false );
11391148
11401149 return expectedRowValues ;
11411150 }
0 commit comments