Skip to content

Commit 1a1e81c

Browse files
authored
[FLINK-39342][Iceberg] Support hadoop.conf.* prefix to pass Hadoop configuration properties (apache#4351)
1 parent 23e23d6 commit 1a1e81c

15 files changed

Lines changed: 275 additions & 62 deletions

File tree

docs/content.zh/docs/connectors/pipeline-connectors/iceberg.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,13 @@ Pipeline 连接器选项
241241
<td>String</td>
242242
<td>透传 Iceberg 表选项到管道,详见 <a href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties">Iceberg 表配置</a>。</td>
243243
</tr>
244+
<tr>
245+
<td>hadoop.conf.*</td>
246+
<td>optional</td>
247+
<td style="word-wrap: break-word;">(none)</td>
248+
<td>String</td>
249+
<td>传递 Hadoop <code>Configuration</code> 参数(用于 Iceberg 的 catalog/table 相关操作)。前缀 <code>hadoop.conf.</code> 会被剥离。例如 <code>hadoop.conf.fs.s3a.endpoint</code>。</td>
250+
</tr>
244251
</tbody>
245252
</table>
246253
</div>

docs/content/docs/connectors/pipeline-connectors/iceberg.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,13 @@ Pipeline Connector Options
241241
<td>String</td>
242242
<td>Pass Iceberg table options to the pipeline,See <a href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties">Iceberg table options</a>. </td>
243243
</tr>
244+
<tr>
245+
<td>hadoop.conf.*</td>
246+
<td>optional</td>
247+
<td style="word-wrap: break-word;">(none)</td>
248+
<td>String</td>
249+
<td>Pass Hadoop <code>Configuration</code> options used by Iceberg catalog/table operations. The prefix <code>hadoop.conf.</code> will be stripped. For example, <code>hadoop.conf.fs.s3a.endpoint</code>.</td>
250+
</tr>
244251
</tbody>
245252
</table>
246253
</div>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ public class IcebergDataSink implements DataSink, Serializable {
3939
// options for creating Iceberg table.
4040
private final Map<String, String> tableOptions;
4141

42+
// options for Hadoop configuration.
43+
private final Map<String, String> hadoopConfOptions;
44+
4245
private final Map<TableId, List<String>> partitionMaps;
4346

4447
private final ZoneId zoneId;
@@ -56,26 +59,38 @@ public IcebergDataSink(
5659
ZoneId zoneId,
5760
String schemaOperatorUid,
5861
CompactionOptions compactionOptions,
59-
String jobIdPrefix) {
62+
String jobIdPrefix,
63+
Map<String, String> hadoopConfOptions) {
6064
this.catalogOptions = catalogOptions;
6165
this.tableOptions = tableOptions;
6266
this.partitionMaps = partitionMaps;
6367
this.zoneId = zoneId;
6468
this.schemaOperatorUid = schemaOperatorUid;
6569
this.compactionOptions = compactionOptions;
6670
this.jobIdPrefix = jobIdPrefix;
71+
this.hadoopConfOptions = hadoopConfOptions;
6772
}
6873

6974
@Override
7075
public EventSinkProvider getEventSinkProvider() {
7176
IcebergSink icebergEventSink =
7277
new IcebergSink(
73-
catalogOptions, tableOptions, zoneId, compactionOptions, jobIdPrefix);
78+
catalogOptions,
79+
tableOptions,
80+
zoneId,
81+
compactionOptions,
82+
jobIdPrefix,
83+
hadoopConfOptions);
7484
return FlinkSinkProvider.of(icebergEventSink);
7585
}
7686

7787
@Override
7888
public MetadataApplier getMetadataApplier() {
79-
return new IcebergMetadataApplier(catalogOptions, tableOptions, partitionMaps);
89+
return new IcebergMetadataApplier(
90+
catalogOptions, tableOptions, partitionMaps, hadoopConfOptions);
91+
}
92+
93+
public Map<String, String> getHadoopConfOptions() {
94+
return hadoopConfOptions;
8095
}
8196
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Set;
4242

4343
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
44+
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_HADOOP_CONF;
4445
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;
4546

4647
/** A {@link DataSinkFactory} for Apache Iceberg. */
@@ -55,13 +56,15 @@ public class IcebergDataSinkFactory implements DataSinkFactory {
5556
@Override
5657
public DataSink createDataSink(Context context) {
5758
FactoryHelper.createFactoryHelper(this, context)
58-
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);
59+
.validateExcept(
60+
PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES, PREFIX_HADOOP_CONF);
5961

6062
Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
6163
OptionUtils.printOptions(IDENTIFIER, allOptions);
6264

6365
Map<String, String> catalogOptions = new HashMap<>();
6466
Map<String, String> tableOptions = new HashMap<>();
67+
Map<String, String> hadoopConfOptions = extractHadoopConfOptions(allOptions);
6568
allOptions.forEach(
6669
(key, value) -> {
6770
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
@@ -116,7 +119,19 @@ public DataSink createDataSink(Context context) {
116119
zoneId,
117120
schemaOperatorUid,
118121
compactionOptions,
119-
jobIdPrefix);
122+
jobIdPrefix,
123+
hadoopConfOptions);
124+
}
125+
126+
static Map<String, String> extractHadoopConfOptions(Map<String, String> allOptions) {
127+
Map<String, String> hadoopConfOptions = new HashMap<>();
128+
allOptions.forEach(
129+
(key, value) -> {
130+
if (key.startsWith(PREFIX_HADOOP_CONF)) {
131+
hadoopConfOptions.put(key.substring(PREFIX_HADOOP_CONF.length()), value);
132+
}
133+
});
134+
return hadoopConfOptions;
120135
}
121136

122137
private CompactionOptions getCompactionStrategy(Configuration configuration) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ public class IcebergDataSinkOptions {
3131
// prefix for passing properties for catalog creation.
3232
public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties.";
3333

34+
// prefix for passing properties for Hadoop configuration.
35+
public static final String PREFIX_HADOOP_CONF = "hadoop.conf.";
36+
3437
public static final ConfigOption<String> TYPE =
3538
key("catalog.properties.type")
3639
.stringType()

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.flink.cdc.common.schema.PhysicalColumn;
3333
import org.apache.flink.cdc.common.sink.MetadataApplier;
3434
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
35+
import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
3536
import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils;
3637

3738
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
@@ -92,29 +93,41 @@ public class IcebergMetadataApplier implements MetadataApplier {
9293

9394
private final Map<TableId, List<String>> partitionMaps;
9495

96+
private final Map<String, String> hadoopConfOptions;
97+
9598
private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
9699

97100
public IcebergMetadataApplier(Map<String, String> catalogOptions) {
98-
this(catalogOptions, new HashMap<>(), new HashMap<>());
101+
this(catalogOptions, new HashMap<>(), new HashMap<>(), null);
99102
}
100103

101104
public IcebergMetadataApplier(
102105
Map<String, String> catalogOptions,
103106
Map<String, String> tableOptions,
104107
Map<TableId, List<String>> partitionMaps) {
108+
this(catalogOptions, tableOptions, partitionMaps, null);
109+
}
110+
111+
public IcebergMetadataApplier(
112+
Map<String, String> catalogOptions,
113+
Map<String, String> tableOptions,
114+
Map<TableId, List<String>> partitionMaps,
115+
Map<String, String> hadoopConfOptions) {
105116
this.catalogOptions = catalogOptions;
106117
this.tableOptions = tableOptions;
107118
this.partitionMaps = partitionMaps;
119+
this.hadoopConfOptions = hadoopConfOptions;
108120
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
109121
}
110122

111123
@Override
112124
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
113125
throws SchemaEvolveException {
114126
if (catalog == null) {
127+
Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
115128
catalog =
116129
CatalogUtil.buildIcebergCatalog(
117-
this.getClass().getSimpleName(), catalogOptions, new Configuration());
130+
this.getClass().getSimpleName(), catalogOptions, configuration);
118131
}
119132
SchemaChangeEventVisitor.visit(
120133
schemaChangeEvent,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.iceberg.sink.utils;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
22+
import java.util.Map;
23+
24+
/** Utilities for building Hadoop {@link Configuration} from connector options. */
25+
public final class HadoopConfUtils {
26+
27+
private HadoopConfUtils() {}
28+
29+
public static Configuration createConfiguration(Map<String, String> hadoopConfOptions) {
30+
Configuration configuration = new Configuration();
31+
applyTo(configuration, hadoopConfOptions);
32+
return configuration;
33+
}
34+
35+
public static void applyTo(Configuration configuration, Map<String, String> hadoopConfOptions) {
36+
if (configuration == null || hadoopConfOptions == null || hadoopConfOptions.isEmpty()) {
37+
return;
38+
}
39+
hadoopConfOptions.forEach(
40+
(k, v) -> {
41+
if (k != null && v != null) {
42+
configuration.set(k, v);
43+
}
44+
});
45+
}
46+
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.api.connector.sink2.Committer;
2121
import org.apache.flink.cdc.common.event.TableId;
22+
import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
2223
import org.apache.flink.metrics.MetricGroup;
2324
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
2425

@@ -67,15 +68,19 @@ public class IcebergCommitter implements Committer<WriteResultWrapper> {
6768

6869
private final Map<TableId, TableMetric> tableIdMetricMap;
6970

70-
public IcebergCommitter(Map<String, String> catalogOptions) {
71-
this(catalogOptions, null);
71+
public IcebergCommitter(
72+
Map<String, String> catalogOptions, Map<String, String> hadoopConfOptions) {
73+
this(catalogOptions, null, hadoopConfOptions);
7274
}
7375

7476
public IcebergCommitter(
75-
Map<String, String> catalogOptions, SinkCommitterMetricGroup metricGroup) {
77+
Map<String, String> catalogOptions,
78+
SinkCommitterMetricGroup metricGroup,
79+
Map<String, String> hadoopConfOptions) {
80+
Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
7681
this.catalog =
7782
CatalogUtil.buildIcebergCatalog(
78-
this.getClass().getSimpleName(), catalogOptions, new Configuration());
83+
this.getClass().getSimpleName(), catalogOptions, configuration);
7984
this.metricGroup = metricGroup;
8085
this.tableIdMetricMap = new HashMap<>();
8186
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ public class IcebergSink
5858

5959
protected final Map<String, String> catalogOptions;
6060
protected final Map<String, String> tableOptions;
61+
protected final Map<String, String> hadoopConfOptions;
6162

6263
private final ZoneId zoneId;
6364

@@ -72,13 +73,15 @@ public IcebergSink(
7273
Map<String, String> tableOptions,
7374
ZoneId zoneId,
7475
CompactionOptions compactionOptions,
75-
String jobIdPrefix) {
76+
String jobIdPrefix,
77+
Map<String, String> hadoopConfOptions) {
7678
this.catalogOptions = catalogOptions;
7779
this.tableOptions = tableOptions;
7880
this.zoneId = zoneId;
7981
this.compactionOptions = compactionOptions;
8082
this.jobId = jobIdPrefix + UUID.randomUUID();
8183
this.operatorId = UUID.randomUUID().toString();
84+
this.hadoopConfOptions = hadoopConfOptions;
8285
}
8386

8487
@Override
@@ -87,14 +90,14 @@ public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
8790
}
8891

8992
public Committer<WriteResultWrapper> createCommitter() {
90-
return new IcebergCommitter(catalogOptions);
93+
return new IcebergCommitter(catalogOptions, hadoopConfOptions);
9194
}
9295

9396
@Override
9497
public Committer<WriteResultWrapper> createCommitter(
9598
CommitterInitContext committerInitContext) {
9699
SinkCommitterMetricGroup metricGroup = committerInitContext.metricGroup();
97-
return new IcebergCommitter(catalogOptions, metricGroup);
100+
return new IcebergCommitter(catalogOptions, metricGroup, hadoopConfOptions);
98101
}
99102

100103
@Override
@@ -115,7 +118,8 @@ public SinkWriter<Event> createWriter(InitContext context) {
115118
zoneId,
116119
lastCheckpointId,
117120
jobId,
118-
operatorId);
121+
operatorId,
122+
hadoopConfOptions);
119123
}
120124

121125
@Override
@@ -130,7 +134,8 @@ public SinkWriter<Event> createWriter(WriterInitContext context) {
130134
zoneId,
131135
lastCheckpointId,
132136
jobId,
133-
operatorId);
137+
operatorId,
138+
hadoopConfOptions);
134139
}
135140

136141
@Override
@@ -153,7 +158,8 @@ public StatefulSinkWriter<Event, IcebergWriterState> restoreWriter(
153158
zoneId,
154159
lastCheckpointId,
155160
jobId,
156-
operatorId);
161+
operatorId,
162+
hadoopConfOptions);
157163
}
158164

159165
@Override
@@ -208,7 +214,8 @@ public void addPostCommitTopology(
208214
.transform(
209215
"Compaction",
210216
typeInformation,
211-
new CompactionOperator(catalogOptions, compactionOptions))
217+
new CompactionOperator(
218+
catalogOptions, compactionOptions, hadoopConfOptions))
212219
.setParallelism(parallelism);
213220
}
214221
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.cdc.common.event.TableId;
2828
import org.apache.flink.cdc.common.schema.Schema;
2929
import org.apache.flink.cdc.common.utils.SchemaUtils;
30+
import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
3031
import org.apache.flink.cdc.connectors.iceberg.sink.utils.RowDataUtils;
3132
import org.apache.flink.table.data.RowData;
3233
import org.apache.flink.table.types.logical.RowType;
@@ -92,10 +93,12 @@ public IcebergWriter(
9293
ZoneId zoneId,
9394
long lastCheckpointId,
9495
String jobId,
95-
String operatorId) {
96+
String operatorId,
97+
Map<String, String> hadoopConfOptions) {
98+
Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
9699
catalog =
97100
CatalogUtil.buildIcebergCatalog(
98-
this.getClass().getSimpleName(), catalogOptions, new Configuration());
101+
this.getClass().getSimpleName(), catalogOptions, configuration);
99102
writerFactoryMap = new HashMap<>();
100103
writerMap = new HashMap<>();
101104
schemaMap = new HashMap<>();

0 commit comments

Comments
 (0)