|
32 | 32 | import org.apache.flink.cdc.common.schema.PhysicalColumn; |
33 | 33 | import org.apache.flink.cdc.common.sink.MetadataApplier; |
34 | 34 | import org.apache.flink.cdc.common.types.utils.DataTypeUtils; |
| 35 | +import org.apache.flink.cdc.connectors.iceberg.sink.utils.HadoopConfUtils; |
35 | 36 | import org.apache.flink.cdc.connectors.iceberg.sink.utils.IcebergTypeUtils; |
36 | 37 |
|
37 | 38 | import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; |
@@ -92,29 +93,41 @@ public class IcebergMetadataApplier implements MetadataApplier { |
92 | 93 |
|
93 | 94 | private final Map<TableId, List<String>> partitionMaps; |
94 | 95 |
|
| 96 | + private final Map<String, String> hadoopConfOptions; |
| 97 | + |
95 | 98 | private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes; |
96 | 99 |
|
97 | 100 | public IcebergMetadataApplier(Map<String, String> catalogOptions) { |
98 | | - this(catalogOptions, new HashMap<>(), new HashMap<>()); |
| 101 | + this(catalogOptions, new HashMap<>(), new HashMap<>(), null); |
99 | 102 | } |
100 | 103 |
|
101 | 104 | public IcebergMetadataApplier( |
102 | 105 | Map<String, String> catalogOptions, |
103 | 106 | Map<String, String> tableOptions, |
104 | 107 | Map<TableId, List<String>> partitionMaps) { |
| 108 | + this(catalogOptions, tableOptions, partitionMaps, null); |
| 109 | + } |
| 110 | + |
| 111 | + public IcebergMetadataApplier( |
| 112 | + Map<String, String> catalogOptions, |
| 113 | + Map<String, String> tableOptions, |
| 114 | + Map<TableId, List<String>> partitionMaps, |
| 115 | + Map<String, String> hadoopConfOptions) { |
105 | 116 | this.catalogOptions = catalogOptions; |
106 | 117 | this.tableOptions = tableOptions; |
107 | 118 | this.partitionMaps = partitionMaps; |
| 119 | + this.hadoopConfOptions = hadoopConfOptions; |
108 | 120 | this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes(); |
109 | 121 | } |
110 | 122 |
|
111 | 123 | @Override |
112 | 124 | public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) |
113 | 125 | throws SchemaEvolveException { |
114 | 126 | if (catalog == null) { |
| 127 | + Configuration configuration = HadoopConfUtils.createConfiguration(hadoopConfOptions); |
115 | 128 | catalog = |
116 | 129 | CatalogUtil.buildIcebergCatalog( |
117 | | - this.getClass().getSimpleName(), catalogOptions, new Configuration()); |
| 130 | + this.getClass().getSimpleName(), catalogOptions, configuration); |
118 | 131 | } |
119 | 132 | SchemaChangeEventVisitor.visit( |
120 | 133 | schemaChangeEvent, |
|
0 commit comments