1818package org .apache .flink .cdc .connectors .sqlserver .source ;
1919
2020import org .apache .flink .api .common .eventtime .WatermarkStrategy ;
21- import org .apache .flink .api .common .restartstrategy .RestartStrategies ;
2221import org .apache .flink .api .common .typeutils .TypeSerializer ;
2322import org .apache .flink .cdc .common .data .binary .BinaryStringData ;
2423import org .apache .flink .cdc .common .event .CreateTableEvent ;
3938import org .apache .flink .cdc .runtime .typeutils .BinaryRecordDataGenerator ;
4039import org .apache .flink .cdc .runtime .typeutils .EventTypeInfo ;
4140import org .apache .flink .configuration .Configuration ;
41+ import org .apache .flink .configuration .StateRecoveryOptions ;
4242import org .apache .flink .core .execution .JobClient ;
43+ import org .apache .flink .core .execution .SavepointFormatType ;
4344import org .apache .flink .runtime .checkpoint .CheckpointException ;
44- import org .apache .flink .runtime .jobgraph .SavepointConfigOptions ;
4545import org .apache .flink .streaming .api .datastream .DataStreamSource ;
4646import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
4747import org .apache .flink .streaming .api .operators .collect .AbstractCollectResultBuffer ;
4848import org .apache .flink .streaming .api .operators .collect .CheckpointedCollectResultBuffer ;
4949import org .apache .flink .streaming .api .operators .collect .CollectResultIterator ;
50+ import org .apache .flink .streaming .api .operators .collect .CollectResultIteratorAdapter ;
5051import org .apache .flink .streaming .api .operators .collect .CollectSinkOperator ;
5152import org .apache .flink .streaming .api .operators .collect .CollectSinkOperatorFactory ;
5253import org .apache .flink .streaming .api .operators .collect .CollectStreamSink ;
54+ import org .apache .flink .streaming .util .RestartStrategyUtils ;
5355import org .apache .flink .table .planner .factories .TestValuesTableFactory ;
5456import org .apache .flink .util .ExceptionUtils ;
5557
@@ -245,7 +247,9 @@ void testAddNewTableWithExclude() throws Exception {
245247 new EventTypeInfo ());
246248
247249 TypeSerializer <Event > serializer =
248- source .getTransformation ().getOutputType ().createSerializer (env .getConfig ());
250+ source .getTransformation ()
251+ .getOutputType ()
252+ .createSerializer (env .getConfig ().getSerializerConfig ());
249253 CheckpointedCollectResultBuffer <Event > resultBuffer =
250254 new CheckpointedCollectResultBuffer <>(serializer );
251255 String accumulatorName = "dataStreamCollect_" + UUID .randomUUID ();
@@ -296,7 +300,9 @@ private void testAddNewTable(TestParam testParam, int parallelism) throws Except
296300 new EventTypeInfo ());
297301
298302 TypeSerializer <Event > serializer =
299- source .getTransformation ().getOutputType ().createSerializer (env .getConfig ());
303+ source .getTransformation ()
304+ .getOutputType ()
305+ .createSerializer (env .getConfig ().getSerializerConfig ());
300306 CheckpointedCollectResultBuffer <Event > resultBuffer =
301307 new CheckpointedCollectResultBuffer <>(serializer );
302308 String accumulatorName = "dataStreamCollect_" + UUID .randomUUID ();
@@ -445,7 +451,9 @@ private String triggerSavepointWithRetry(JobClient jobClient, String savepointDi
445451 // retry 600 times, it takes 100 milliseconds per time, at most retry 1 minute
446452 while (retryTimes < 600 ) {
447453 try {
448- return jobClient .triggerSavepoint (savepointDirectory ).get ();
454+ return jobClient
455+ .triggerSavepoint (savepointDirectory , SavepointFormatType .DEFAULT )
456+ .get ();
449457 } catch (Exception e ) {
450458 Optional <CheckpointException > exception =
451459 ExceptionUtils .findThrowable (e , CheckpointException .class );
@@ -550,13 +558,17 @@ private <T> CollectResultIterator<T> addCollector(
550558 String accumulatorName ) {
551559 CollectSinkOperatorFactory <T > sinkFactory =
552560 new CollectSinkOperatorFactory <>(serializer , accumulatorName );
553- CollectSinkOperator <T > operator = (CollectSinkOperator <T >) sinkFactory .getOperator ();
554- CollectResultIterator <T > iterator =
555- new CollectResultIterator <>(
556- buffer , operator .getOperatorIdFuture (), accumulatorName , 0 );
557561 CollectStreamSink <T > sink = new CollectStreamSink <>(source , sinkFactory );
558- sink .name ("Data stream collect sink" );
562+ // Set both name and uid to the same value. The uid is used by Flink to generate
563+ // OperatorID via StreamGraphHasherV2.generateUserSpecifiedHash(uid), and the same
564+ // uid string must be passed to CollectResultIteratorAdapter for coordinator lookup.
565+ String operatorUid = "Data stream collect sink" ;
566+ sink .name (operatorUid ).uid (operatorUid );
567+ CollectSinkOperator <T > operator = (CollectSinkOperator <T >) sinkFactory .getOperator ();
559568 env .addOperator (sink .getTransformation ());
569+ CollectResultIterator <T > iterator =
570+ new CollectResultIteratorAdapter <>(
571+ buffer , operatorUid , operator , accumulatorName , 0 );
560572 env .registerCollectIterator (iterator );
561573 return iterator ;
562574 }
@@ -565,13 +577,13 @@ private StreamExecutionEnvironment getStreamExecutionEnvironment(
565577 String finishedSavePointPath , int parallelism ) {
566578 Configuration configuration = new Configuration ();
567579 if (finishedSavePointPath != null ) {
568- configuration .setString ( SavepointConfigOptions .SAVEPOINT_PATH , finishedSavePointPath );
580+ configuration .set ( StateRecoveryOptions .SAVEPOINT_PATH , finishedSavePointPath );
569581 }
570582 StreamExecutionEnvironment env =
571583 StreamExecutionEnvironment .getExecutionEnvironment (configuration );
572584 env .setParallelism (parallelism );
573585 env .enableCheckpointing (500L );
574- env . setRestartStrategy ( RestartStrategies . fixedDelayRestart ( 3 , 1000L ) );
586+ RestartStrategyUtils . configureFixedDelayRestartStrategy ( env , 3 , 1000L );
575587 return env ;
576588 }
577589
0 commit comments