diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java index c0856529472..3bdbc924d90 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java @@ -107,7 +107,7 @@ public DataSink createDataSink(Context context) { } } String jobIdPrefix = - context.getFactoryConfiguration().get(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX); + context.getFactoryConfiguration().get(IcebergDataSinkOptions.JOB_ID_PREFIX); return new IcebergDataSink( catalogOptions, @@ -147,7 +147,7 @@ public Set> optionalOptions() { options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED); options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL); options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM); - options.add(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX); + options.add(IcebergDataSinkOptions.JOB_ID_PREFIX); return options; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java index f989909fa7d..57bdd887bfe 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java @@ -80,8 +80,8 @@ public class IcebergDataSinkOptions { "The parallelism for file compaction, default value is -1, which means that compaction parallelism is equal to sink writer parallelism."); @Experimental - public static final ConfigOption SINK_JOB_ID_PREFIX = - key("sink.job.id.prefix") + public static final ConfigOption JOB_ID_PREFIX = + key("job.id.prefix") .stringType() .defaultValue("cdc") .withDescription( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java index 848fd258499..10f89e16d08 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java @@ -57,7 +57,7 @@ void testCreateDataSink() { Configuration conf = Configuration.fromMap(ImmutableMap.builder().build()); conf.set(IcebergDataSinkOptions.WAREHOUSE, "/tmp/warehouse"); conf.set(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM, 4); - conf.set(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX, "FlinkCDC"); + conf.set(IcebergDataSinkOptions.JOB_ID_PREFIX, "FlinkCDC"); DataSink dataSink = sinkFactory.createDataSink( new FactoryHelper.DefaultContext(