From 12fbeb44fd4731fdd52f4b4f335368c159150412 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Wed, 11 Feb 2026 17:28:05 +0800 Subject: [PATCH 1/2] [minor][hotfix] Update configOption name from `sink.job.id.prefix` to `job.id.prefix`. --- .../cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java | 4 ++-- .../cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java | 4 ++-- .../connectors/iceberg/sink/IcebergDataSinkFactoryTest.java | 2 +- .../src/test/resources/log4j2-test.properties | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java index c0856529472..3bdbc924d90 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java @@ -107,7 +107,7 @@ public DataSink createDataSink(Context context) { } } String jobIdPrefix = - context.getFactoryConfiguration().get(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX); + context.getFactoryConfiguration().get(IcebergDataSinkOptions.JOB_ID_PREFIX); return new IcebergDataSink( catalogOptions, @@ -147,7 +147,7 @@ public Set> optionalOptions() { options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED); options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL); options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM); - options.add(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX); + options.add(IcebergDataSinkOptions.JOB_ID_PREFIX); return options; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java index f989909fa7d..57bdd887bfe 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java @@ -80,8 +80,8 @@ public class IcebergDataSinkOptions { "The parallelism for file compaction, default value is -1, which means that compaction parallelism is equal to sink writer parallelism."); @Experimental - public static final ConfigOption SINK_JOB_ID_PREFIX = - key("sink.job.id.prefix") + public static final ConfigOption JOB_ID_PREFIX = + key("job.id.prefix") .stringType() .defaultValue("cdc") .withDescription( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java index 848fd258499..10f89e16d08 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java @@ -57,7 +57,7 @@ void testCreateDataSink() { Configuration conf = Configuration.fromMap(ImmutableMap.builder().build()); conf.set(IcebergDataSinkOptions.WAREHOUSE, "/tmp/warehouse"); conf.set(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM, 4); - conf.set(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX, "FlinkCDC"); + conf.set(IcebergDataSinkOptions.JOB_ID_PREFIX, "FlinkCDC"); DataSink dataSink = sinkFactory.createDataSink( new FactoryHelper.DefaultContext( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/resources/log4j2-test.properties index 5fe3b384b14..92fb1f749f2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/resources/log4j2-test.properties +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/resources/log4j2-test.properties @@ -16,7 +16,7 @@ # Set root logger level to ERROR to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level=ERROR +rootLogger.level=INFO rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger From 3ad8e886db2c7e769b2eba415d12e1f6174e79f0 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Wed, 11 Feb 2026 17:39:15 +0800 Subject: [PATCH 2/2] Address comment. --- .../src/test/resources/log4j2-test.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/resources/log4j2-test.properties b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/resources/log4j2-test.properties index 92fb1f749f2..5fe3b384b14 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/resources/log4j2-test.properties +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/resources/log4j2-test.properties @@ -16,7 +16,7 @@ # Set root logger level to ERROR to not flood build logs # set manually to INFO for debugging purposes -rootLogger.level=INFO +rootLogger.level=ERROR rootLogger.appenderRef.test.ref = TestLogger appender.testlogger.name = TestLogger