2222import org .apache .flink .cdc .connectors .utils .ExternalResourceProxy ;
2323import org .apache .flink .runtime .minicluster .RpcServiceSharing ;
2424import org .apache .flink .runtime .testutils .MiniClusterResourceConfiguration ;
25- import org .apache .flink .streaming .api .CheckpointingMode ;
2625import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
2726import org .apache .flink .table .api .TableResult ;
2827import org .apache .flink .table .api .bridge .java .StreamTableEnvironment ;
4544import java .sql .SQLException ;
4645import java .util .ArrayList ;
4746import java .util .Arrays ;
47+ import java .util .Collections ;
4848import java .util .List ;
49+ import java .util .Map ;
4950import java .util .concurrent .ExecutionException ;
5051import java .util .concurrent .ExecutorService ;
5152import java .util .concurrent .Executors ;
5253import java .util .concurrent .FutureTask ;
5354import java .util .concurrent .TimeUnit ;
5455import java .util .concurrent .TimeoutException ;
56+ import java .util .stream .Collectors ;
5557import java .util .stream .Stream ;
5658
5759import static java .lang .String .format ;
@@ -87,9 +89,9 @@ public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase {
8789
8890 public static Stream <Arguments > parameters () {
8991 return Stream .of (
90- Arguments .of ("customers" , null ),
91- Arguments .of ("customers" , "id" ),
92- Arguments .of ("customers_no_pk" , "id" ));
92+ Arguments .of ("customers" , null , "false" ),
93+ Arguments .of ("customers" , "id" , "true" ),
94+ Arguments .of ("customers_no_pk" , "id" , "true" ));
9395 }
9496
9597 @ RegisterExtension
@@ -120,33 +122,38 @@ public void clean() {
120122 // Failover tests
121123 @ ParameterizedTest
122124 @ MethodSource ("parameters" )
123- @ Timeout (value = 120 , unit = TimeUnit .SECONDS )
124- public void testTaskManagerFailoverInSnapshotPhase (String tableName , String chunkColumnName )
125- throws Exception {
125+ public void testTaskManagerFailoverInSnapshotPhase (
126+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
126127 testMySqlParallelSource (
127128 FailoverType .TM ,
128129 FailoverPhase .SNAPSHOT ,
129130 new String [] {tableName , "customers_1" },
130131 tableName ,
131- chunkColumnName );
132+ chunkColumnName ,
133+ Collections .singletonMap (
134+ "scan.incremental.snapshot.unbounded-chunk-first.enabled" ,
135+ assignEndingFirst ));
132136 }
133137
134138 @ ParameterizedTest
135139 @ MethodSource ("parameters" )
136- public void testTaskManagerFailoverInBinlogPhase (String tableName , String chunkColumnName )
137- throws Exception {
140+ public void testTaskManagerFailoverInBinlogPhase (
141+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
138142 testMySqlParallelSource (
139143 FailoverType .TM ,
140144 FailoverPhase .BINLOG ,
141145 new String [] {tableName , "customers_1" },
142146 tableName ,
143- chunkColumnName );
147+ chunkColumnName ,
148+ Collections .singletonMap (
149+ "scan.incremental.snapshot.unbounded-chunk-first.enabled" ,
150+ assignEndingFirst ));
144151 }
145152
146153 @ ParameterizedTest
147154 @ MethodSource ("parameters" )
148- public void testTaskManagerFailoverFromLatestOffset (String tableName , String chunkColumnName )
149- throws Exception {
155+ public void testTaskManagerFailoverFromLatestOffset (
156+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
150157 testMySqlParallelSource (
151158 DEFAULT_PARALLELISM ,
152159 "latest-offset" ,
@@ -155,37 +162,46 @@ public void testTaskManagerFailoverFromLatestOffset(String tableName, String chu
155162 new String [] {tableName , "customers_1" },
156163 RestartStrategies .fixedDelayRestart (1 , 0 ),
157164 tableName ,
158- chunkColumnName );
165+ chunkColumnName ,
166+ Collections .singletonMap (
167+ "scan.incremental.snapshot.unbounded-chunk-first.enabled" ,
168+ assignEndingFirst ));
159169 }
160170
161171 @ ParameterizedTest
162172 @ MethodSource ("parameters" )
163- public void testJobManagerFailoverInSnapshotPhase (String tableName , String chunkColumnName )
164- throws Exception {
173+ public void testJobManagerFailoverInSnapshotPhase (
174+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
165175 testMySqlParallelSource (
166176 FailoverType .JM ,
167177 FailoverPhase .SNAPSHOT ,
168178 new String [] {tableName , "customers_1" },
169179 tableName ,
170- chunkColumnName );
180+ chunkColumnName ,
181+ Collections .singletonMap (
182+ "scan.incremental.snapshot.unbounded-chunk-first.enabled" ,
183+ assignEndingFirst ));
171184 }
172185
173186 @ ParameterizedTest
174187 @ MethodSource ("parameters" )
175- public void testJobManagerFailoverInBinlogPhase (String tableName , String chunkColumnName )
176- throws Exception {
188+ public void testJobManagerFailoverInBinlogPhase (
189+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
177190 testMySqlParallelSource (
178191 FailoverType .JM ,
179192 FailoverPhase .BINLOG ,
180193 new String [] {tableName , "customers_1" },
181194 tableName ,
182- chunkColumnName );
195+ chunkColumnName ,
196+ Collections .singletonMap (
197+ "scan.incremental.snapshot.unbounded-chunk-first.enabled" ,
198+ assignEndingFirst ));
183199 }
184200
185201 @ ParameterizedTest
186202 @ MethodSource ("parameters" )
187- public void testJobManagerFailoverFromLatestOffset (String tableName , String chunkColumnName )
188- throws Exception {
203+ public void testJobManagerFailoverFromLatestOffset (
204+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
189205 testMySqlParallelSource (
190206 DEFAULT_PARALLELISM ,
191207 "latest-offset" ,
@@ -194,49 +210,60 @@ public void testJobManagerFailoverFromLatestOffset(String tableName, String chun
194210 new String [] {tableName , "customers_1" },
195211 RestartStrategies .fixedDelayRestart (1 , 0 ),
196212 tableName ,
197- chunkColumnName );
213+ chunkColumnName ,
214+ Collections .singletonMap (
215+ "scan.incremental.snapshot.unbounded-chunk-first.enabled" ,
216+ assignEndingFirst ));
198217 }
199218
200219 @ ParameterizedTest
201220 @ MethodSource ("parameters" )
202- public void testTaskManagerFailoverSingleParallelism (String tableName , String chunkColumnName )
203- throws Exception {
221+ public void testTaskManagerFailoverSingleParallelism (
222+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
204223 testMySqlParallelSource (
205224 1 ,
206225 FailoverType .TM ,
207226 FailoverPhase .SNAPSHOT ,
208227 new String [] {tableName },
209228 tableName ,
210- chunkColumnName );
229+ chunkColumnName ,
230+ Collections .singletonMap (
231+ "scan.incremental.snapshot.unbounded-chunk-first.enabled" ,
232+ assignEndingFirst ));
211233 }
212234
213235 @ ParameterizedTest
214236 @ MethodSource ("parameters" )
215- public void testJobManagerFailoverSingleParallelism (String tableName , String chunkColumnName )
216- throws Exception {
237+ public void testJobManagerFailoverSingleParallelism (
238+ String tableName , String chunkColumnName , String assignEndingFirst ) throws Exception {
217239 testMySqlParallelSource (
218240 1 ,
219241 FailoverType .JM ,
220242 FailoverPhase .SNAPSHOT ,
221243 new String [] {tableName },
222244 tableName ,
223- chunkColumnName );
245+ chunkColumnName ,
246+ Collections .singletonMap (
247+ "scan.incremental.snapshot.unbounded-chunk-first.enabled" ,
248+ assignEndingFirst ));
224249 }
225250
226251 private void testMySqlParallelSource (
227252 FailoverType failoverType ,
228253 FailoverPhase failoverPhase ,
229254 String [] captureCustomerTables ,
230255 String tableName ,
231- String chunkColumnName )
256+ String chunkColumnName ,
257+ Map <String , String > otherOptions )
232258 throws Exception {
233259 testMySqlParallelSource (
234260 DEFAULT_PARALLELISM ,
235261 failoverType ,
236262 failoverPhase ,
237263 captureCustomerTables ,
238264 tableName ,
239- chunkColumnName );
265+ chunkColumnName ,
266+ otherOptions );
240267 }
241268
242269 private void testMySqlParallelSource (
@@ -245,7 +272,8 @@ private void testMySqlParallelSource(
245272 FailoverPhase failoverPhase ,
246273 String [] captureCustomerTables ,
247274 String tableName ,
248- String chunkColumnName )
275+ String chunkColumnName ,
276+ Map <String , String > otherOptions )
249277 throws Exception {
250278 testMySqlParallelSource (
251279 parallelism ,
@@ -255,7 +283,8 @@ private void testMySqlParallelSource(
255283 captureCustomerTables ,
256284 RestartStrategies .fixedDelayRestart (1 , 0 ),
257285 tableName ,
258- chunkColumnName );
286+ chunkColumnName ,
287+ otherOptions );
259288 }
260289
261290 private void testMySqlParallelSource (
@@ -266,7 +295,8 @@ private void testMySqlParallelSource(
266295 String [] captureCustomerTables ,
267296 RestartStrategies .RestartStrategyConfiguration restartStrategyConfiguration ,
268297 String tableName ,
269- String chunkColumnName )
298+ String chunkColumnName ,
299+ Map <String , String > otherOptions )
270300 throws Exception {
271301 testMySqlParallelSource (
272302 parallelism ,
@@ -277,7 +307,8 @@ private void testMySqlParallelSource(
277307 restartStrategyConfiguration ,
278308 false ,
279309 tableName ,
280- chunkColumnName );
310+ chunkColumnName ,
311+ otherOptions );
281312 }
282313
283314 private void testMySqlParallelSource (
@@ -289,11 +320,10 @@ private void testMySqlParallelSource(
289320 RestartStrategies .RestartStrategyConfiguration restartStrategyConfiguration ,
290321 boolean skipSnapshotBackfill ,
291322 String tableName ,
292- String chunkColumnName )
323+ String chunkColumnName ,
324+ Map <String , String > otherOptions )
293325 throws Exception {
294- captureCustomerTables = new String [] {tableName };
295326 StreamExecutionEnvironment env = StreamExecutionEnvironment .getExecutionEnvironment ();
296- env .enableCheckpointing (3000L , CheckpointingMode .EXACTLY_ONCE );
297327 StreamTableEnvironment tEnv = StreamTableEnvironment .create (env );
298328
299329 env .setParallelism (parallelism );
@@ -324,6 +354,7 @@ private void testMySqlParallelSource(
324354 + " 'server-time-zone' = 'Asia/Shanghai',"
325355 + " 'server-id' = '%s'"
326356 + " %s"
357+ + " %s"
327358 + ")" ,
328359 getHost (),
329360 getPort (),
@@ -338,7 +369,17 @@ private void testMySqlParallelSource(
338369 ? ""
339370 : String .format (
340371 ", 'scan.incremental.snapshot.chunk.key-column' = '%s'" ,
341- chunkColumnName ));
372+ chunkColumnName ),
373+ otherOptions .isEmpty ()
374+ ? ""
375+ : ","
376+ + otherOptions .entrySet ().stream ()
377+ .map (
378+ e ->
379+ String .format (
380+ "'%s'='%s'" ,
381+ e .getKey (), e .getValue ()))
382+ .collect (Collectors .joining ("," )));
342383 tEnv .executeSql (sourceDDL );
343384 TableResult tableResult = tEnv .executeSql ("select * from customers" );
344385
@@ -350,7 +391,7 @@ private void testMySqlParallelSource(
350391 // second step: check the binlog data
351392 checkBinlogData (tableResult , failoverType , failoverPhase , captureCustomerTables );
352393
353- // sleepMs(3000);
394+ sleepMs (3000 );
354395 tableResult .getJobClient ().get ().cancel ().get ();
355396 }
356397
0 commit comments