3636import org .apache .flink .cdc .runtime .operators .sink .BatchDataSinkFunctionOperator ;
3737import org .apache .flink .cdc .runtime .operators .sink .DataSinkFunctionOperator ;
3838import org .apache .flink .cdc .runtime .operators .sink .DataSinkWriterOperatorFactory ;
39- import org .apache .flink .core .io .SimpleVersionedSerializer ;
4039import org .apache .flink .runtime .jobgraph .OperatorID ;
4140import org .apache .flink .streaming .api .connector .sink2 .CommittableMessage ;
4241import org .apache .flink .streaming .api .connector .sink2 .CommittableMessageTypeInfo ;
4645import org .apache .flink .streaming .api .datastream .DataStream ;
4746import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
4847import org .apache .flink .streaming .api .functions .sink .SinkFunction ;
49- import org .apache .flink .streaming .api .operators .OneInputStreamOperatorFactory ;
5048import org .apache .flink .streaming .api .operators .StreamSink ;
5149import org .apache .flink .streaming .api .transformations .LegacySinkTransformation ;
5250import org .apache .flink .streaming .api .transformations .PhysicalTransformation ;
53-
54- import java .lang .reflect .InvocationTargetException ;
51+ import org .apache .flink .streaming .runtime .operators .sink .CommitterOperatorFactory ;
5552
5653/** Translator used to build {@link DataSink} for given {@link DataStream}. */
5754@ Internal
@@ -178,8 +175,10 @@ private <CommT> void addCommittingTopology(
178175 boolean isBatchMode ,
179176 OperatorID schemaOperatorID ,
180177 OperatorUidGenerator operatorUidGenerator ) {
178+ TwoPhaseCommittingSink <Event , CommT > committingSink =
179+ (TwoPhaseCommittingSink <Event , CommT >) sink ;
181180 TypeInformation <CommittableMessage <CommT >> typeInformation =
182- CommittableMessageTypeInfo .of (() -> getCommittableSerializer ( sink ) );
181+ CommittableMessageTypeInfo .of (committingSink :: getCommittableSerializer );
183182 DataStream <CommittableMessage <CommT >> written =
184183 inputStream
185184 .transform (
@@ -202,8 +201,8 @@ private <CommT> void addCommittingTopology(
202201 .transform (
203202 SINK_COMMITTER_PREFIX + sinkName ,
204203 typeInformation ,
205- getCommitterOperatorFactory (
206- sink , isBatchMode , isCheckpointingEnabled ))
204+ new CommitterOperatorFactory <> (
205+ committingSink , isBatchMode , isCheckpointingEnabled ))
207206 .uid (operatorUidGenerator .generateUid ("sink-committer" ));
208207
209208 if (sink instanceof WithPostCommitTopology ) {
@@ -215,37 +214,4 @@ private String generateSinkName(SinkDef sinkDef) {
215214 return sinkDef .getName ()
216215 .orElse (String .format ("Flink CDC Event Sink: %s" , sinkDef .getType ()));
217216 }
218-
219- private static <CommT > SimpleVersionedSerializer <CommT > getCommittableSerializer (Object sink ) {
220- // FIX ME: TwoPhaseCommittingSink has been deprecated, and its signature has changed
221- // during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported.
222- try {
223- return (SimpleVersionedSerializer <CommT >)
224- sink .getClass ().getDeclaredMethod ("getCommittableSerializer" ).invoke (sink );
225- } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e ) {
226- throw new RuntimeException ("Failed to get CommittableSerializer" , e );
227- }
228- }
229-
230- private static <CommT >
231- OneInputStreamOperatorFactory <CommittableMessage <CommT >, CommittableMessage <CommT >>
232- getCommitterOperatorFactory (
233- Sink <Event > sink , boolean isBatchMode , boolean isCheckpointingEnabled ) {
234- // FIX ME: OneInputStreamOperatorFactory is an @Internal class, and its signature has
235- // changed during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported.
236- try {
237- return (OneInputStreamOperatorFactory <
238- CommittableMessage <CommT >, CommittableMessage <CommT >>)
239- Class .forName (
240- "org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory" )
241- .getDeclaredConstructors ()[0 ]
242- .newInstance (sink , isBatchMode , isCheckpointingEnabled );
243-
244- } catch (ClassNotFoundException
245- | InstantiationException
246- | IllegalAccessException
247- | InvocationTargetException e ) {
248- throw new RuntimeException ("Failed to create CommitterOperatorFactory" , e );
249- }
250- }
251217}
0 commit comments