From 3e58f001a30f7fdd68542d6603c0e52b00d2d383 Mon Sep 17 00:00:00 2001 From: Pei Yu <125331682@qq.com> Date: Mon, 16 Mar 2026 20:42:28 +0800 Subject: [PATCH] remove compatibility code for older versions of Flink. Signed-off-by: Pei Yu <125331682@qq.com> --- .../flink/translator/DataSinkTranslator.java | 46 +++---------------- 1 file changed, 6 insertions(+), 40 deletions(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java index aa530300b9e..117b08aed53 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSinkTranslator.java @@ -36,7 +36,6 @@ import org.apache.flink.cdc.runtime.operators.sink.BatchDataSinkFunctionOperator; import org.apache.flink.cdc.runtime.operators.sink.DataSinkFunctionOperator; import org.apache.flink.cdc.runtime.operators.sink.DataSinkWriterOperatorFactory; -import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; @@ -46,12 +45,10 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; import org.apache.flink.streaming.api.transformations.PhysicalTransformation; - -import java.lang.reflect.InvocationTargetException; +import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory; /** Translator used to build {@link DataSink} for given {@link DataStream}. */ @Internal @@ -178,8 +175,10 @@ private void addCommittingTopology( boolean isBatchMode, OperatorID schemaOperatorID, OperatorUidGenerator operatorUidGenerator) { + TwoPhaseCommittingSink committingSink = + (TwoPhaseCommittingSink) sink; TypeInformation> typeInformation = - CommittableMessageTypeInfo.of(() -> getCommittableSerializer(sink)); + CommittableMessageTypeInfo.of(committingSink::getCommittableSerializer); DataStream> written = inputStream .transform( @@ -202,8 +201,8 @@ private void addCommittingTopology( .transform( SINK_COMMITTER_PREFIX + sinkName, typeInformation, - getCommitterOperatorFactory( - sink, isBatchMode, isCheckpointingEnabled)) + new CommitterOperatorFactory<>( + committingSink, isBatchMode, isCheckpointingEnabled)) .uid(operatorUidGenerator.generateUid("sink-committer")); if (sink instanceof WithPostCommitTopology) { @@ -215,37 +214,4 @@ private String generateSinkName(SinkDef sinkDef) { return sinkDef.getName() .orElse(String.format("Flink CDC Event Sink: %s", sinkDef.getType())); } - - private static SimpleVersionedSerializer getCommittableSerializer(Object sink) { - // FIX ME: TwoPhaseCommittingSink has been deprecated, and its signature has changed - // during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported. - try { - return (SimpleVersionedSerializer) - sink.getClass().getDeclaredMethod("getCommittableSerializer").invoke(sink); - } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException("Failed to get CommittableSerializer", e); - } - } - - private static - OneInputStreamOperatorFactory, CommittableMessage> - getCommitterOperatorFactory( - Sink sink, boolean isBatchMode, boolean isCheckpointingEnabled) { - // FIX ME: OneInputStreamOperatorFactory is an @Internal class, and its signature has - // changed during Flink 1.18 to 1.19. Remove this when Flink 1.18 is no longer supported. - try { - return (OneInputStreamOperatorFactory< - CommittableMessage, CommittableMessage>) - Class.forName( - "org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory") - .getDeclaredConstructors()[0] - .newInstance(sink, isBatchMode, isCheckpointingEnabled); - - } catch (ClassNotFoundException - | InstantiationException - | IllegalAccessException - | InvocationTargetException e) { - throw new RuntimeException("Failed to create CommitterOperatorFactory", e); - } - } }