[FLINK-39342][Iceberg] Support hadoop.conf.* prefix to pass Hadoop configuration properties#4351
Conversation
…nfiguration properties
…pConfOptions parameter
There was a problem hiding this comment.
Pull request overview
Adds a new hadoop.conf.* option prefix for the Iceberg pipeline sink connector so users can pass arbitrary Hadoop Configuration properties through the pipeline job config, and wires those options into Iceberg catalog/table operations.
Changes:
- Parse and allow
hadoop.conf.*options inIcebergDataSinkFactory(strip prefix) and propagate them through sink components. - Apply the propagated options when creating the Hadoop
Configurationused to build Iceberg catalogs (writer/committer/metadata applier/compaction). - Update tests and docs to reflect the new configuration capability.
Reviewed changes
Copilot reviewed 15 out of 15 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java | Introduces PREFIX_HADOOP_CONF constant for new option namespace. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java | Allows hadoop.conf.* during validation and extracts stripped Hadoop conf options. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java | Stores Hadoop conf options and passes them to sink + metadata applier. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java | Builds Iceberg catalog with Hadoop Configuration created from passed Hadoop conf options. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/HadoopConfUtils.java | New helper to build/apply Hadoop Configuration from option maps. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java | Propagates Hadoop conf options into writer/committer/compaction operator construction. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java | Uses Hadoop conf options when building the Iceberg catalog. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java | Uses Hadoop conf options when building the Iceberg catalog. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/compaction/CompactionOperator.java | Uses Hadoop conf options when lazily building the Iceberg catalog for compaction. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java | Adds a test that hadoop.conf.* options are accepted by factory validation. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java | Updates writer/committer construction to pass Hadoop conf options. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSinkITCase.java | Updates sink construction to pass Hadoop conf options. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java | Updates writer/committer/operator construction to pass Hadoop conf options. |
| docs/content/docs/connectors/pipeline-connectors/iceberg.md | Documents the new hadoop.conf.* option prefix. |
| docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md | Chinese documentation for the new hadoop.conf.* option prefix. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| sinkFactory.createDataSink( | ||
| new FactoryHelper.DefaultContext( | ||
| conf, conf, Thread.currentThread().getContextClassLoader())); | ||
| Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class); | ||
| } |
There was a problem hiding this comment.
This test only asserts that the factory accepts hadoop.conf.* options, but it doesn't verify that the prefix is stripped and the resulting Hadoop configuration options are actually carried into the created sink (and later used when building the Iceberg catalog). Consider asserting the produced hadoopConfOptions contents (e.g., via reflection or a package-private accessor) to ensure the new feature works end-to-end.
There was a problem hiding this comment.
I agree with this comment.
Here you've only proven that the DataSink can be created, but you haven't demonstrated that the Hadoop conf is actually taking effect. You need to verify in some way that the Hadoop conf is working as intended.
|
The test failure in |
… IcebergDataSink in factory test
|
The test failure in PaimonSinkITCase.testDuplicateCommitAfterRestore is a pre-existing flaky test unrelated to this PR. It fails because the COMPACT snapshot is generated asynchronously (since waitCompaction=false for |
This PR adds support for a hadoop.conf.* configuration prefix in the Iceberg pipeline sink connector, allowing users to pass arbitrary Hadoop configuration properties directly through the pipeline job definition.