Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ Pipeline 连接器选项
<td>String</td>
<td>透传 Iceberg 表选项到管道,详见 <a href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties">Iceberg 表配置</a>。</td>
</tr>
<tr>
<td>hadoop.conf.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>传递 Hadoop <code>Configuration</code> 参数(用于 Iceberg 的 catalog/table 相关操作)。前缀 <code>hadoop.conf.</code> 会被剥离。例如 <code>hadoop.conf.fs.s3a.endpoint</code>。</td>
</tr>
</tbody>
</table>
</div>
Expand Down
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,13 @@ Pipeline Connector Options
<td>String</td>
<td>Pass Iceberg table options to the pipeline,See <a href="https://iceberg.apache.org/docs/nightly/configuration/#write-properties">Iceberg table options</a>. </td>
</tr>
<tr>
<td>hadoop.conf.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Pass Hadoop <code>Configuration</code> options used by Iceberg catalog/table operations. The prefix <code>hadoop.conf.</code> will be stripped. For example, <code>hadoop.conf.fs.s3a.endpoint</code>.</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class IcebergDataSink implements DataSink, Serializable {
// options for creating Iceberg table.
private final Map<String, String> tableOptions;

// options for Hadoop configuration.
private final Map<String, String> hadoopConfOptions;

private final Map<TableId, List<String>> partitionMaps;

private final ZoneId zoneId;
Expand All @@ -56,26 +59,38 @@ public IcebergDataSink(
ZoneId zoneId,
String schemaOperatorUid,
CompactionOptions compactionOptions,
String jobIdPrefix) {
String jobIdPrefix,
Map<String, String> hadoopConfOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.partitionMaps = partitionMaps;
this.zoneId = zoneId;
this.schemaOperatorUid = schemaOperatorUid;
this.compactionOptions = compactionOptions;
this.jobIdPrefix = jobIdPrefix;
this.hadoopConfOptions = hadoopConfOptions;
}

@Override
public EventSinkProvider getEventSinkProvider() {
IcebergSink icebergEventSink =
new IcebergSink(
catalogOptions, tableOptions, zoneId, compactionOptions, jobIdPrefix);
catalogOptions,
tableOptions,
zoneId,
compactionOptions,
jobIdPrefix,
hadoopConfOptions);
return FlinkSinkProvider.of(icebergEventSink);
}

@Override
public MetadataApplier getMetadataApplier() {
return new IcebergMetadataApplier(catalogOptions, tableOptions, partitionMaps);
return new IcebergMetadataApplier(
catalogOptions, tableOptions, partitionMaps, hadoopConfOptions);
}

public Map<String, String> getHadoopConfOptions() {
return hadoopConfOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Set;

import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES;
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_HADOOP_CONF;
import static org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSinkOptions.PREFIX_TABLE_PROPERTIES;

/** A {@link DataSinkFactory} for Apache Iceberg. */
Expand All @@ -55,13 +56,15 @@ public class IcebergDataSinkFactory implements DataSinkFactory {
@Override
public DataSink createDataSink(Context context) {
FactoryHelper.createFactoryHelper(this, context)
.validateExcept(PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES);
.validateExcept(
PREFIX_TABLE_PROPERTIES, PREFIX_CATALOG_PROPERTIES, PREFIX_HADOOP_CONF);

Map<String, String> allOptions = context.getFactoryConfiguration().toMap();
OptionUtils.printOptions(IDENTIFIER, allOptions);

Map<String, String> catalogOptions = new HashMap<>();
Map<String, String> tableOptions = new HashMap<>();
Map<String, String> hadoopConfOptions = extractHadoopConfOptions(allOptions);
allOptions.forEach(
(key, value) -> {
if (key.startsWith(PREFIX_TABLE_PROPERTIES)) {
Expand Down Expand Up @@ -116,7 +119,19 @@ public DataSink createDataSink(Context context) {
zoneId,
schemaOperatorUid,
compactionOptions,
jobIdPrefix);
jobIdPrefix,
hadoopConfOptions);
}

static Map<String, String> extractHadoopConfOptions(Map<String, String> allOptions) {
Map<String, String> hadoopConfOptions = new HashMap<>();
allOptions.forEach(
(key, value) -> {
if (key.startsWith(PREFIX_HADOOP_CONF)) {
hadoopConfOptions.put(key.substring(PREFIX_HADOOP_CONF.length()), value);
}
});
return hadoopConfOptions;
}

private CompactionOptions getCompactionStrategy(Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public class IcebergDataSinkOptions {
// prefix for passing properties for catalog creation.
public static final String PREFIX_CATALOG_PROPERTIES = "catalog.properties.";

// prefix for passing properties for Hadoop configuration.
public static final String PREFIX_HADOOP_CONF = "hadoop.conf.";

public static final ConfigOption<String> TYPE =
key("catalog.properties.type")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.cdc.common.schema.PhysicalColumn;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils;

import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
Expand Down Expand Up @@ -92,29 +93,41 @@ public class IcebergMetadataApplier implements MetadataApplier {

private final Map<TableId, List<String>> partitionMaps;

private final Map<String, String> hadoopConfOptions;

private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;

public IcebergMetadataApplier(Map<String, String> catalogOptions) {
this(catalogOptions, new HashMap<>(), new HashMap<>());
this(catalogOptions, new HashMap<>(), new HashMap<>(), null);
}

public IcebergMetadataApplier(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
Map<TableId, List<String>> partitionMaps) {
this(catalogOptions, tableOptions, partitionMaps, null);
}

public IcebergMetadataApplier(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
Map<TableId, List<String>> partitionMaps,
Map<String, String> hadoopConfOptions) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.partitionMaps = partitionMaps;
this.hadoopConfOptions = hadoopConfOptions;
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
}

@Override
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent)
throws SchemaEvolveException {
if (catalog == null) {
Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
this.getClass().getSimpleName(), catalogOptions, new Configuration());
this.getClass().getSimpleName(), catalogOptions, configuration);
Comment thread
lvyanquan marked this conversation as resolved.
}
SchemaChangeEventVisitor.visit(
schemaChangeEvent,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.iceberg.sink.utils;

import org.apache.hadoop.conf.Configuration;

import java.util.Map;

/** Utilities for building Hadoop {@link Configuration} from connector options. */
public final class HadoopConfUtils {

private HadoopConfUtils() {}

public static Configuration createConfiguration(Map<String, String> hadoopConfOptions) {
Configuration configuration = new Configuration();
applyTo(configuration, hadoopConfOptions);
return configuration;
}

public static void applyTo(Configuration configuration, Map<String, String> hadoopConfOptions) {
if (configuration == null || hadoopConfOptions == null || hadoopConfOptions.isEmpty()) {
return;
}
hadoopConfOptions.forEach(
(k, v) -> {
if (k != null && v != null) {
configuration.set(k, v);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;

Expand Down Expand Up @@ -67,15 +68,19 @@ public class IcebergCommitter implements Committer<WriteResultWrapper> {

private final Map<TableId, TableMetric> tableIdMetricMap;

public IcebergCommitter(Map<String, String> catalogOptions) {
this(catalogOptions, null);
public IcebergCommitter(
Map<String, String> catalogOptions, Map<String, String> hadoopConfOptions) {
this(catalogOptions, null, hadoopConfOptions);
}

public IcebergCommitter(
Map<String, String> catalogOptions, SinkCommitterMetricGroup metricGroup) {
Map<String, String> catalogOptions,
SinkCommitterMetricGroup metricGroup,
Map<String, String> hadoopConfOptions) {
Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
this.catalog =
CatalogUtil.buildIcebergCatalog(
this.getClass().getSimpleName(), catalogOptions, new Configuration());
this.getClass().getSimpleName(), catalogOptions, configuration);
Comment thread
lvyanquan marked this conversation as resolved.
this.metricGroup = metricGroup;
this.tableIdMetricMap = new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class IcebergSink

protected final Map<String, String> catalogOptions;
protected final Map<String, String> tableOptions;
protected final Map<String, String> hadoopConfOptions;

private final ZoneId zoneId;

Expand All @@ -72,13 +73,15 @@ public IcebergSink(
Map<String, String> tableOptions,
ZoneId zoneId,
CompactionOptions compactionOptions,
String jobIdPrefix) {
String jobIdPrefix,
Map<String, String> hadoopConfOptions) {
Comment thread
lvyanquan marked this conversation as resolved.
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.zoneId = zoneId;
this.compactionOptions = compactionOptions;
this.jobId = jobIdPrefix + UUID.randomUUID();
this.operatorId = UUID.randomUUID().toString();
this.hadoopConfOptions = hadoopConfOptions;
}

@Override
Expand All @@ -87,14 +90,14 @@ public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
}

public Committer<WriteResultWrapper> createCommitter() {
return new IcebergCommitter(catalogOptions);
return new IcebergCommitter(catalogOptions, hadoopConfOptions);
}

@Override
public Committer<WriteResultWrapper> createCommitter(
CommitterInitContext committerInitContext) {
SinkCommitterMetricGroup metricGroup = committerInitContext.metricGroup();
return new IcebergCommitter(catalogOptions, metricGroup);
return new IcebergCommitter(catalogOptions, metricGroup, hadoopConfOptions);
}

@Override
Expand All @@ -115,7 +118,8 @@ public SinkWriter<Event> createWriter(InitContext context) {
zoneId,
lastCheckpointId,
jobId,
operatorId);
operatorId,
hadoopConfOptions);
}

@Override
Expand All @@ -130,7 +134,8 @@ public SinkWriter<Event> createWriter(WriterInitContext context) {
zoneId,
lastCheckpointId,
jobId,
operatorId);
operatorId,
hadoopConfOptions);
}

@Override
Expand All @@ -153,7 +158,8 @@ public StatefulSinkWriter<Event, IcebergWriterState> restoreWriter(
zoneId,
lastCheckpointId,
jobId,
operatorId);
operatorId,
hadoopConfOptions);
}

@Override
Expand Down Expand Up @@ -208,7 +214,8 @@ public void addPostCommitTopology(
.transform(
"Compaction",
typeInformation,
new CompactionOperator(catalogOptions, compactionOptions))
new CompactionOperator(
catalogOptions, compactionOptions, hadoopConfOptions))
.setParallelism(parallelism);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils;
import org.apache.flink.cdc.connectors.iceberg.sink.utils.RowDataUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
Expand Down Expand Up @@ -92,10 +93,12 @@ public IcebergWriter(
ZoneId zoneId,
long lastCheckpointId,
String jobId,
String operatorId) {
String operatorId,
Map<String, String> hadoopConfOptions) {
Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions);
catalog =
CatalogUtil.buildIcebergCatalog(
this.getClass().getSimpleName(), catalogOptions, new Configuration());
this.getClass().getSimpleName(), catalogOptions, configuration);
Comment thread
lvyanquan marked this conversation as resolved.
writerFactoryMap = new HashMap<>();
writerMap = new HashMap<>();
schemaMap = new HashMap<>();
Expand Down
Loading
Loading