diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
index 8cada084e2b..74b8ec1c724 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md
@@ -241,6 +241,13 @@ Pipeline 连接器选项
String |
透传 Iceberg 表选项到管道,详见 Iceberg 表配置。 |
+
+ | hadoop.conf.* |
+ optional |
+ (none) |
+ String |
+ 传递 Hadoop Configuration 参数(用于 Iceberg 的 catalog/table 相关操作)。前缀 hadoop.conf. 会被剥离。例如 hadoop.conf.fs.s3a.endpoint。 |
+
diff --git a/docs/content/docs/connectors/pipeline-connectors/iceberg.md b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
index 2063e572312..cc7dded49f6 100644
--- a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
@@ -241,6 +241,13 @@ Pipeline Connector Options
String |
Pass Iceberg table options to the pipeline,See Iceberg table options. |
+
+ | hadoop.conf.* |
+ optional |
+ (none) |
+ String |
+ Pass Hadoop Configuration options used by Iceberg catalog/table operations. The prefix hadoop.conf. will be stripped. For example, hadoop.conf.fs.s3a.endpoint. |
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
index 96c2b5f7665..ee6c9bd0515 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java
@@ -39,6 +39,9 @@ public class IcebergDataSink implements DataSink, Serializable {
// options for creating Iceberg table.
private final Map tableOptions;
+ // options for Hadoop configuration.
+ private final Map hadoopConfOptions;
+
private final Map> partitionMaps;
private final ZoneId zoneId;
@@ -56,7 +59,8 @@ public IcebergDataSink(
ZoneId zoneId,
String schemaOperatorUid,
CompactionOptions compactionOptions,
- String jobIdPrefix) {
+ String jobIdPrefix,
+ Map hadoopConfOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.partitionMaps = partitionMaps;
@@ -64,18 +68,29 @@ public IcebergDataSink(
this.schemaOperatorUid = schemaOperatorUid;
this.compactionOptions = compactionOptions;
this.jobIdPrefix = jobIdPrefix;
+ this.hadoopConfOptions = hadoopConfOptions;
}
@Override
public EventSinkProvider getEventSinkProvider() {
IcebergSink icebergEventSink =
new IcebergSink(
- catalogOptions, tableOptions, zoneId, compactionOptions, jobIdPrefix);
+ catalogOptions,
+ tableOptions,
+ zoneId,
+ compactionOptions,
+ jobIdPrefix,
+ hadoopConfOptions);
return FlinkSinkProvider.of(icebergEventSink);
}
@Override
public MetadataApplier getMetadataApplier() {
- return new IcebergMetadataApplier(catalogOptions, tableOptions, partitionMaps);
+ return new IcebergMetadataApplier(
+ catalogOptions, tableOptions, partitionMaps, hadoopConfOptions);
+ }
+
+ public Map getHadoopConfOptions() {
+ return hadoopConfOptions;
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index 13336ff21f7..9c902552398 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -41,6 +41,7 @@
import java.util.Set;
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
+import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_HADOOP_CONF;
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;
/** A {@link DataSinkFactory} for Apache Iceberg. */
@@ -55,13 +56,15 @@ public class IcebergDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
- .validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);
+ .validateExcept(
+ PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES, PREFIX_HADOOP_CONF);
Map allOptions = context.getFactoryConfiguration().toMap();
OptionUtils.printOptions(IDENTIFIER, allOptions);
Map catalogOptions = new HashMap<>();
Map tableOptions = new HashMap<>();
+ Map hadoopConfOptions = extractHadoopConfOptions(allOptions);
allOptions.forEach(
(key, value) -> {
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
@@ -116,7 +119,19 @@ public DataSink createDataSink(Context context) {
zoneId,
schemaOperatorUid,
compactionOptions,
- jobIdPrefix);
+ jobIdPrefix,
+ hadoopConfOptions);
+ }
+
+ static Map extractHadoopConfOptions(Map allOptions) {
+ Map hadoopConfOptions = new HashMap<>();
+ allOptions.forEach(
+ (key, value) -> {
+ if (key.startsWith(PREFIX_HADOOP_CONF)) {
+ hadoopConfOptions.put(key.substring(PREFIX_HADOOP_CONF.length()), value);
+ }
+ });
+ return hadoopConfOptions;
}
private CompactionOptions getCompactionStrategy(Configuration configuration) {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
index 0aa59677c71..5fcf3f23b76 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
@@ -31,6 +31,9 @@ public class IcebergDataSinkOptions {
// prefix for passing properties for catalog creation.
public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties.";
+ // prefix for passing properties for Hadoop configuration.
+ public static final String PREFIX_HADOOP_CONF = "hadoop.conf.";
+
public static final ConfigOption TYPE =
key("catalog.properties.type")
.stringType()
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
index c71358104c9..01cb4b49050 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
@@ -32,6 +32,7 @@
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
@@ -92,19 +93,30 @@ public class IcebergMetadataApplier implements MetadataApplier {
private final Map> partitionMaps;
+ private final Map hadoopConfOptions;
+
private Set enabledSchemaEvolutionTypes;
public IcebergMetadataApplier(Map catalogOptions) {
- this(catalogOptions, new HashMap<>(), new HashMap<>());
+ this(catalogOptions, new HashMap<>(), new HashMap<>(), null);
}
public IcebergMetadataApplier(
Map catalogOptions,
Map tableOptions,
Map> partitionMaps) {
+ this(catalogOptions, tableOptions, partitionMaps, null);
+ }
+
+ public IcebergMetadataApplier(
+ Map catalogOptions,
+ Map tableOptions,
+ Map> partitionMaps,
+ Map hadoopConfOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.partitionMaps = partitionMaps;
+ this.hadoopConfOptions = hadoopConfOptions;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}
@@ -112,9 +124,10 @@ public IcebergMetadataApplier(
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
throws SchemaEvolveException {
if (catalog == null) {
+ Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions, new Configuration());
+ this.getClass().getSimpleName(), catalogOptions, configuration);
}
SchemaChangeEventVisitor.visit(
schemaChangeEvent,
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java
new file mode 100644
index 00000000000..39eb50b8338
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.iceberg.sink.utils;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Map;
+
+/** Utilities for building Hadoop {@link Configuration} from connector options. */
+public final class HadoopConfUtils {
+
+ private HadoopConfUtils() {}
+
+ public static Configuration createConfiguration(Map hadoopConfOptions) {
+ Configuration configuration = new Configuration();
+ applyTo(configuration, hadoopConfOptions);
+ return configuration;
+ }
+
+ public static void applyTo(Configuration configuration, Map hadoopConfOptions) {
+ if (configuration == null || hadoopConfOptions == null || hadoopConfOptions.isEmpty()) {
+ return;
+ }
+ hadoopConfOptions.forEach(
+ (k, v) -> {
+ if (k != null && v != null) {
+ configuration.set(k, v);
+ }
+ });
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
index 6cadc3d9171..75977744065 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java
@@ -19,6 +19,7 @@
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
@@ -67,15 +68,19 @@ public class IcebergCommitter implements Committer {
private final Map tableIdMetricMap;
- public IcebergCommitter(Map catalogOptions) {
- this(catalogOptions, null);
+ public IcebergCommitter(
+ Map catalogOptions, Map hadoopConfOptions) {
+ this(catalogOptions, null, hadoopConfOptions);
}
public IcebergCommitter(
- Map catalogOptions, SinkCommitterMetricGroup metricGroup) {
+ Map catalogOptions,
+ SinkCommitterMetricGroup metricGroup,
+ Map hadoopConfOptions) {
+ Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
this.catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions, new Configuration());
+ this.getClass().getSimpleName(), catalogOptions, configuration);
this.metricGroup = metricGroup;
this.tableIdMetricMap = new HashMap<>();
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
index 23bb4bf873f..b392bd9ad16 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
@@ -58,6 +58,7 @@ public class IcebergSink
protected final Map catalogOptions;
protected final Map tableOptions;
+ protected final Map hadoopConfOptions;
private final ZoneId zoneId;
@@ -72,13 +73,15 @@ public IcebergSink(
Map tableOptions,
ZoneId zoneId,
CompactionOptions compactionOptions,
- String jobIdPrefix) {
+ String jobIdPrefix,
+ Map hadoopConfOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.zoneId = zoneId;
this.compactionOptions = compactionOptions;
this.jobId = jobIdPrefix + UUID.randomUUID();
this.operatorId = UUID.randomUUID().toString();
+ this.hadoopConfOptions = hadoopConfOptions;
}
@Override
@@ -87,14 +90,14 @@ public DataStream addPreWriteTopology(DataStream dataStream) {
}
public Committer createCommitter() {
- return new IcebergCommitter(catalogOptions);
+ return new IcebergCommitter(catalogOptions, hadoopConfOptions);
}
@Override
public Committer createCommitter(
CommitterInitContext committerInitContext) {
SinkCommitterMetricGroup metricGroup = committerInitContext.metricGroup();
- return new IcebergCommitter(catalogOptions, metricGroup);
+ return new IcebergCommitter(catalogOptions, metricGroup, hadoopConfOptions);
}
@Override
@@ -115,7 +118,8 @@ public SinkWriter createWriter(InitContext context) {
zoneId,
lastCheckpointId,
jobId,
- operatorId);
+ operatorId,
+ hadoopConfOptions);
}
@Override
@@ -130,7 +134,8 @@ public SinkWriter createWriter(WriterInitContext context) {
zoneId,
lastCheckpointId,
jobId,
- operatorId);
+ operatorId,
+ hadoopConfOptions);
}
@Override
@@ -153,7 +158,8 @@ public StatefulSinkWriter restoreWriter(
zoneId,
lastCheckpointId,
jobId,
- operatorId);
+ operatorId,
+ hadoopConfOptions);
}
@Override
@@ -208,7 +214,8 @@ public void addPostCommitTopology(
.transform(
"Compaction",
typeInformation,
- new CompactionOperator(catalogOptions, compactionOptions))
+ new CompactionOperator(
+ catalogOptions, compactionOptions, hadoopConfOptions))
.setParallelism(parallelism);
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
index 62e47d897d9..cbcd3b98eaa 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java
@@ -27,6 +27,7 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.utils.RowDataUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
@@ -92,10 +93,12 @@ public IcebergWriter(
ZoneId zoneId,
long lastCheckpointId,
String jobId,
- String operatorId) {
+ String operatorId,
+ Map hadoopConfOptions) {
+ Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions, new Configuration());
+ this.getClass().getSimpleName(), catalogOptions, configuration);
writerFactoryMap = new HashMap<>();
writerMap = new HashMap<>();
schemaMap = new HashMap<>();
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
index adfdba629f6..6ee9c08568f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java
@@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction;
import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.v2.WriteResultWrapper;
import org.apache.flink.cdc.runtime.operators.AbstractStreamOperatorAdapter;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -62,16 +63,21 @@ public class CompactionOperator
private final CompactionOptions compactionOptions;
+ private final Map hadoopConfOptions;
+
private volatile Throwable throwable;
private ExecutorService compactExecutor;
public CompactionOperator(
- Map catalogOptions, CompactionOptions compactionOptions) {
+ Map catalogOptions,
+ CompactionOptions compactionOptions,
+ Map hadoopConfOptions) {
this.tableCommitTimes = new HashMap<>();
this.compactedTables = new HashSet<>();
this.catalogOptions = catalogOptions;
this.compactionOptions = compactionOptions;
+ this.hadoopConfOptions = hadoopConfOptions;
}
@Override
@@ -111,9 +117,10 @@ public void processElement(StreamRecord>
private void compact(TableId tableId) {
if (catalog == null) {
+ Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
- this.getClass().getSimpleName(), catalogOptions, new Configuration());
+ this.getClass().getSimpleName(), catalogOptions, configuration);
}
try {
RewriteDataFilesActionResult rewriteResult =
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index 0ada4ffa00c..0e3fc00ed6e 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -93,6 +93,42 @@ void testUnsupportedOption() {
+ "unsupported_key");
}
+ @Test
+ void testHadoopConfOptionsAreAllowed() {
+ IcebergDataSinkFactory sinkFactory =
+ (IcebergDataSinkFactory)
+ FactoryDiscoveryUtils.getFactoryByIdentifier(
+ "iceberg", DataSinkFactory.class);
+ Assertions.assertThat(sinkFactory).isInstanceOf(IcebergDataSinkFactory.class);
+
+ Configuration conf =
+ Configuration.fromMap(
+ ImmutableMap.builder()
+ .put(
+ "hadoop.conf.hive.metastore.kerberos.keytab.file",
+ "/etc/security/keytabs/hive.service.keytab")
+ .put(
+ "hadoop.conf.hive.metastore.kerberos.principal",
+ "hive/_HOST@EXAMPLE.COM")
+ .put("hadoop.conf.hive.metastore.sasl.enabled", "true")
+ .build());
+
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf, Thread.currentThread().getContextClassLoader()));
+
+ Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
+ Map hadoopConfOptions = ((IcebergDataSink) dataSink).getHadoopConfOptions();
+ Assertions.assertThat(hadoopConfOptions)
+ .containsEntry(
+ "hive.metastore.kerberos.keytab.file",
+ "/etc/security/keytabs/hive.service.keytab")
+ .containsEntry("hive.metastore.kerberos.principal", "hive/_HOST@EXAMPLE.COM")
+ .containsEntry("hive.metastore.sasl.enabled", "true")
+ .doesNotContainKey("hadoop.conf.hive.metastore.kerberos.keytab.file");
+ }
+
@Test
void testPrefixRequireOption() {
DataSinkFactory sinkFactory =
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
index 6d0fce3ebba..2a2a84b3ce6 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
@@ -80,7 +80,8 @@ public void testCompationOperator() throws IOException, InterruptedException {
ZoneId.systemDefault(),
checkpointId,
jobId,
- operatorId);
+ operatorId,
+ new HashMap<>());
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
@@ -134,34 +135,37 @@ public void testCompationOperator() throws IOException, InterruptedException {
BinaryRecordDataGenerator binaryRecordDataGenerator =
new BinaryRecordDataGenerator(
createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0]));
- IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
- // Commit many times.
int smallFileCount = 100;
- for (long i = 0; i < smallFileCount; i++) {
- RecordData recordData =
- binaryRecordDataGenerator.generate(
- new Object[] {
- i,
- BinaryStringData.fromString("Mark"),
- 10,
- BinaryStringData.fromString("test"),
- true,
- 1.0f,
- 1.0d,
- DecimalData.fromBigDecimal(new BigDecimal(1.0), 10, 2),
- DateData.fromEpochDay(9)
- });
- icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordData), null);
- Collection> collection =
- icebergWriter.prepareCommit().stream()
- .map(IcebergWriterTest.MockCommitRequestImpl::new)
- .collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ // Commit many times.
+ for (long i = 0; i < smallFileCount; i++) {
+ RecordData recordData =
+ binaryRecordDataGenerator.generate(
+ new Object[] {
+ i,
+ BinaryStringData.fromString("Mark"),
+ 10,
+ BinaryStringData.fromString("test"),
+ true,
+ 1.0f,
+ 1.0d,
+ DecimalData.fromBigDecimal(new BigDecimal(1.0), 10, 2),
+ DateData.fromEpochDay(9)
+ });
+ icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordData), null);
+ Collection> collection =
+ icebergWriter.prepareCommit().stream()
+ .map(IcebergWriterTest.MockCommitRequestImpl::new)
+ .collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+ }
}
CompactionOperator compactionOperator =
new CompactionOperator(
catalogOptions,
- CompactionOptions.builder().commitInterval(1).parallelism(4).build());
+ CompactionOptions.builder().commitInterval(1).parallelism(4).build(),
+ new HashMap<>());
compactionOperator.processElement(
new StreamRecord<>(
new CommittableWithLineage<>(
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
index 2bd3b228a2a..c9bf112d28d 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java
@@ -105,7 +105,8 @@ public void testIcebergSink() throws Exception {
null,
null,
CompactionOptions.builder().build(),
- "FlinkCDC");
+ "FlinkCDC",
+ new HashMap<>());
String[] expected = new String[] {"21, 1.732, Disenchanted", "17, 6.28, Doris Day"};
stream.sinkTo(icebergSink);
env.execute("Values to Iceberg Sink");
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index b16b889319a..745aea81f96 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -93,7 +93,14 @@ public void testWriteWithSchemaEvolution() throws Exception {
String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
new IcebergWriter(
- catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
@@ -179,10 +186,12 @@ public void testWriteWithSchemaEvolution() throws Exception {
DataChangeEvent dataChangeEvent2 = DataChangeEvent.insertEvent(tableId, recordData2);
icebergWriter.write(dataChangeEvent2, null);
Collection writeResults = icebergWriter.prepareCommit();
- IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
Collection> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
List result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
@@ -258,7 +267,10 @@ public void testWriteWithSchemaEvolution() throws Exception {
writeResults = icebergWriter.prepareCommit();
collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
@@ -284,7 +296,15 @@ public void testWriteWithAllSupportedTypes() throws Exception {
String jobId = UUID.randomUUID().toString();
String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
- new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0, jobId, operatorId);
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ pipelineZoneId,
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
@@ -349,10 +369,12 @@ public void testWriteWithAllSupportedTypes() throws Exception {
DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, record1);
icebergWriter.write(dataChangeEvent, null);
Collection writeResults = icebergWriter.prepareCommit();
- IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
Collection> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
List result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
@@ -393,7 +415,14 @@ private void runTestPartitionWrite(String partitionKey, Expression expression)
String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
new IcebergWriter(
- catalogOptions, 1, 1, ZoneId.systemDefault(), 0, jobId, operatorId);
+ catalogOptions,
+ 1,
+ 1,
+ ZoneId.systemDefault(),
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
TableId tableId = TableId.parse("test.iceberg_table");
Map> partitionMaps = new HashMap<>();
@@ -440,10 +469,12 @@ private void runTestPartitionWrite(String partitionKey, Expression expression)
DataChangeEvent dataChangeEvent2 = DataChangeEvent.insertEvent(tableId, recordData2);
icebergWriter.write(dataChangeEvent2, null);
Collection writeResults = icebergWriter.prepareCommit();
- IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
Collection> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
Table table =
catalog.loadTable(
@@ -482,7 +513,15 @@ public void testWithRepeatCommit() throws Exception {
String jobId = UUID.randomUUID().toString();
String operatorId = UUID.randomUUID().toString();
IcebergWriter icebergWriter =
- new IcebergWriter(catalogOptions, 1, 1, pipelineZoneId, 0, jobId, operatorId);
+ new IcebergWriter(
+ catalogOptions,
+ 1,
+ 1,
+ pipelineZoneId,
+ 0,
+ jobId,
+ operatorId,
+ new HashMap<>());
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
TableId tableId = TableId.parse("test.iceberg_table");
TableIdentifier tableIdentifier =
@@ -514,10 +553,12 @@ public void testWithRepeatCommit() throws Exception {
DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, record1);
icebergWriter.write(dataChangeEvent, null);
Collection writeResults = icebergWriter.prepareCommit();
- IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions);
Collection> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ }
List result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result.size()).isEqualTo(1);
Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1");
@@ -538,8 +579,11 @@ public void testWithRepeatCommit() throws Exception {
writeResults = icebergWriter.prepareCommit();
collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
- icebergCommitter.commit(collection);
- icebergCommitter.commit(collection);
+ try (IcebergCommitter icebergCommitter =
+ new IcebergCommitter(catalogOptions, new HashMap<>())) {
+ icebergCommitter.commit(collection);
+ icebergCommitter.commit(collection);
+ }
summary = catalog.loadTable(tableIdentifier).currentSnapshot().summary();
Assertions.assertThat(summary.get("total-data-files")).isEqualTo("2");
Assertions.assertThat(summary.get("added-records")).isEqualTo("1");