Skip to content

Commit 20b62f3

Browse files
authored
[Improve][Connector-V2] Support hadoop ha and kerberos for paimon sink (#6585)
1 parent 595cdd1 commit 20b62f3

File tree

21 files changed

+885
-116
lines changed

21 files changed

+885
-116
lines changed

Diff for: docs/en/connector-v2/sink/Paimon.md

+45-1
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ Sink connector for Apache Paimon. It can support cdc mode 、auto create table.
1717
| warehouse | String | Yes | - | Paimon warehouse path |
1818
| database | String | Yes | - | The database you want to access |
1919
| table | String | Yes | - | The table you want to access |
20-
| hdfs_site_path | String | No | - | |
20+
| hdfs_site_path | String | No | - | The path of hdfs-site.xml |
2121
| schema_save_mode | Enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | The schema save mode |
2222
| data_save_mode | Enum | No | APPEND_DATA | The data save mode |
2323
| paimon.table.primary-keys | String | No | - | Default comma-separated list of columns (primary key) that identify a row in tables.(Notice: The partition field needs to be included in the primary key fields) |
2424
| paimon.table.partition-keys | String | No | - | Default comma-separated list of partition fields to use when creating tables. |
2525
| paimon.table.write-props | Map | No | - | Properties passed through to paimon table initialization, [reference](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions). |
26+
| paimon.hadoop.conf | Map | No | - | Properties in hadoop conf |
27+
| paimon.hadoop.conf-path | String | No | - | The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files |
2628

2729
## Examples
2830

@@ -57,6 +59,48 @@ sink {
5759
}
5860
```
5961

62+
### Single table(Specify hadoop HA config and kerberos config)
63+
64+
```hocon
65+
env {
66+
parallelism = 1
67+
job.mode = "STREAMING"
68+
checkpoint.interval = 5000
69+
}
70+
71+
source {
72+
Mysql-CDC {
73+
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
74+
username = "root"
75+
password = "******"
76+
table-names = ["seatunnel.role"]
77+
}
78+
}
79+
80+
transform {
81+
}
82+
83+
sink {
84+
Paimon {
85+
catalog_name="seatunnel_test"
86+
warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
87+
database="seatunnel"
88+
table="role"
89+
paimon.hadoop.conf = {
90+
fs.defaultFS = "hdfs://nameservice1"
91+
dfs.nameservices = "nameservice1"
92+
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
93+
dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
94+
dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
95+
dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
96+
dfs.client.use.datanode.hostname = "true"
97+
security.kerberos.login.principal = "your-kerberos-principal"
98+
security.kerberos.login.keytab = "your-kerberos-keytab-path"
99+
}
100+
}
101+
}
102+
```
103+
60104
### Single table with write props of paimon
61105

62106
```hocon

Diff for: docs/zh/connector-v2/sink/Paimon.md

+45-1
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,14 @@ Apache Paimon数据连接器。支持cdc写以及自动建表。
1717
| warehouse | 字符串 || - | Paimon warehouse路径 |
1818
| database | 字符串 || - | 数据库名称 |
1919
| table | 字符串 || - | 表名 |
20-
| hdfs_site_path | 字符串 || - | |
20+
| hdfs_site_path | 字符串 || - | hdfs-site.xml文件路径 |
2121
| schema_save_mode | 枚举 || CREATE_SCHEMA_WHEN_NOT_EXIST | Schema保存模式 |
2222
| data_save_mode | 枚举 || APPEND_DATA | 数据保存模式 |
2323
| paimon.table.primary-keys | 字符串 || - | 主键字段列表,联合主键使用逗号分隔(注意:分区字段需要包含在主键字段中) |
2424
| paimon.table.partition-keys | 字符串 || - | 分区字段列表,多字段使用逗号分隔 |
2525
| paimon.table.write-props | Map || - | Paimon表初始化指定的属性, [参考](https://paimon.apache.org/docs/0.6/maintenance/configurations/#coreoptions) |
26+
| paimon.hadoop.conf | Map || - | Hadoop配置文件属性信息 |
27+
| paimon.hadoop.conf-path | 字符串 || - | Hadoop配置文件目录,用于加载'core-site.xml', 'hdfs-site.xml', 'hive-site.xml'文件配置 |
2628

2729
## 示例
2830

@@ -57,6 +59,48 @@ sink {
5759
}
5860
```
5961

62+
### 单表(指定hadoop HA配置和kerberos配置)
63+
64+
```hocon
65+
env {
66+
parallelism = 1
67+
job.mode = "STREAMING"
68+
checkpoint.interval = 5000
69+
}
70+
71+
source {
72+
Mysql-CDC {
73+
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
74+
username = "root"
75+
password = "******"
76+
table-names = ["seatunnel.role"]
77+
}
78+
}
79+
80+
transform {
81+
}
82+
83+
sink {
84+
Paimon {
85+
catalog_name="seatunnel_test"
86+
warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
87+
database="seatunnel"
88+
table="role"
89+
paimon.hadoop.conf = {
90+
fs.defaultFS = "hdfs://nameservice1"
91+
dfs.nameservices = "nameservice1"
92+
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
93+
dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
94+
dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
95+
dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
96+
dfs.client.use.datanode.hostname = "true"
97+
security.kerberos.login.principal = "your-kerberos-principal"
98+
security.kerberos.login.keytab = "your-kerberos-keytab-path"
99+
}
100+
}
101+
}
102+
```
103+
60104
### 指定paimon的写属性的单表
61105

62106
```hocon

Diff for: seatunnel-connectors-v2/connector-paimon/pom.xml

+8-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
<name>SeaTunnel : Connectors V2 : Paimon</name>
3131

3232
<properties>
33-
<paimon.version>0.6.0-incubating</paimon.version>
33+
<paimon.version>0.7.0-incubating</paimon.version>
3434
</properties>
3535

3636
<dependencies>
@@ -46,6 +46,13 @@
4646
<version>${paimon.version}</version>
4747
</dependency>
4848

49+
<dependency>
50+
<groupId>org.apache.seatunnel</groupId>
51+
<artifactId>seatunnel-guava</artifactId>
52+
<version>${project.version}</version>
53+
<classifier>optional</classifier>
54+
</dependency>
55+
4956
<dependency>
5057
<groupId>org.apache.seatunnel</groupId>
5158
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,9 @@ public OptionRule optionRule() {
5151
PaimonSinkConfig.DATA_SAVE_MODE,
5252
PaimonSinkConfig.PRIMARY_KEYS,
5353
PaimonSinkConfig.PARTITION_KEYS,
54-
PaimonSinkConfig.WRITE_PROPS)
54+
PaimonSinkConfig.WRITE_PROPS,
55+
PaimonSinkConfig.HADOOP_CONF,
56+
PaimonSinkConfig.HADOOP_CONF_PATH)
5557
.build();
5658
}
5759
}

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java

+50-12
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration;
2021
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
22+
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
23+
import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
24+
import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext;
2125

2226
import org.apache.commons.lang3.StringUtils;
2327
import org.apache.hadoop.conf.Configuration;
24-
import org.apache.hadoop.fs.Path;
2528
import org.apache.paimon.catalog.Catalog;
2629
import org.apache.paimon.catalog.CatalogContext;
2730
import org.apache.paimon.catalog.CatalogFactory;
@@ -31,31 +34,66 @@
3134

3235
import java.io.Serializable;
3336
import java.util.HashMap;
37+
import java.util.Iterator;
3438
import java.util.Map;
3539

3640
import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE;
3741

3842
@Slf4j
3943
public class PaimonCatalogLoader implements Serializable {
40-
private PaimonSinkConfig config;
44+
/** hdfs uri is required */
45+
private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
4146

42-
public PaimonCatalogLoader(PaimonSinkConfig config) {
43-
this.config = config;
47+
private static final String HDFS_PREFIX = "hdfs://";
48+
/** ********* Hdfs constants ************* */
49+
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
50+
51+
private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
52+
53+
private String warehouse;
54+
55+
private PaimonHadoopConfiguration paimonHadoopConfiguration;
56+
57+
public PaimonCatalogLoader(PaimonSinkConfig paimonSinkConfig) {
58+
this.warehouse = paimonSinkConfig.getWarehouse();
59+
this.paimonHadoopConfiguration = PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig);
4460
}
4561

4662
public Catalog loadCatalog() {
4763
// When using the seatunel engine, set the current class loader to prevent loading failures
4864
Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader());
49-
final String warehouse = config.getWarehouse();
50-
final Map<String, String> optionsMap = new HashMap<>();
65+
final Map<String, String> optionsMap = new HashMap<>(1);
5166
optionsMap.put(WAREHOUSE.key(), warehouse);
5267
final Options options = Options.fromMap(optionsMap);
53-
final Configuration hadoopConf = new Configuration();
54-
String hdfsSitePathOptional = config.getHdfsSitePath();
55-
if (StringUtils.isNotBlank(hdfsSitePathOptional)) {
56-
hadoopConf.addResource(new Path(hdfsSitePathOptional));
68+
if (warehouse.startsWith(HDFS_PREFIX)) {
69+
checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME);
70+
paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL);
71+
}
72+
PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration);
73+
final CatalogContext catalogContext =
74+
CatalogContext.create(options, paimonHadoopConfiguration);
75+
try {
76+
return PaimonSecurityContext.runSecured(
77+
() -> CatalogFactory.createCatalog(catalogContext));
78+
} catch (Exception e) {
79+
throw new PaimonConnectorException(
80+
PaimonConnectorErrorCode.LOAD_CATALOG,
81+
"Failed to perform SecurityContext.runSecured",
82+
e);
83+
}
84+
}
85+
86+
void checkConfiguration(Configuration configuration, String key) {
87+
Iterator<Map.Entry<String, String>> entryIterator = configuration.iterator();
88+
while (entryIterator.hasNext()) {
89+
Map.Entry<String, String> entry = entryIterator.next();
90+
if (entry.getKey().equals(key)) {
91+
if (StringUtils.isBlank(entry.getValue())) {
92+
throw new IllegalArgumentException("The value of" + key + " is required");
93+
}
94+
return;
95+
}
5796
}
58-
final CatalogContext catalogContext = CatalogContext.create(options, hadoopConf);
59-
return CatalogFactory.createCatalog(catalogContext);
97+
throw new IllegalArgumentException(key + " is required");
6098
}
6199
}

Diff for: seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java

+58
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,31 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.config;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
22+
2023
import org.apache.seatunnel.api.configuration.Option;
2124
import org.apache.seatunnel.api.configuration.Options;
25+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2226
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2327
import org.apache.seatunnel.api.source.SeaTunnelSource;
2428

29+
import lombok.Getter;
30+
2531
import java.io.Serializable;
32+
import java.util.Arrays;
33+
import java.util.HashMap;
2634
import java.util.List;
35+
import java.util.Map;
36+
37+
import static java.util.stream.Collectors.toList;
38+
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
2739

2840
/**
2941
* Utility class to store configuration options, used by {@link SeaTunnelSource} and {@link
3042
* SeaTunnelSink}.
3143
*/
44+
@Getter
3245
public class PaimonConfig implements Serializable {
3346

3447
public static final Option<String> WAREHOUSE =
@@ -61,9 +74,54 @@ public class PaimonConfig implements Serializable {
6174
.noDefaultValue()
6275
.withDescription("The read columns of the flink table store");
6376

77+
@Deprecated
6478
public static final Option<String> HDFS_SITE_PATH =
6579
Options.key("hdfs_site_path")
6680
.stringType()
6781
.noDefaultValue()
6882
.withDescription("The file path of hdfs-site.xml");
83+
84+
public static final Option<Map<String, String>> HADOOP_CONF =
85+
Options.key("paimon.hadoop.conf")
86+
.mapType()
87+
.defaultValue(new HashMap<>())
88+
.withDescription("Properties in hadoop conf");
89+
90+
public static final Option<String> HADOOP_CONF_PATH =
91+
Options.key("paimon.hadoop.conf-path")
92+
.stringType()
93+
.noDefaultValue()
94+
.withDescription(
95+
"The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files");
96+
97+
protected String catalogName;
98+
protected String warehouse;
99+
protected String namespace;
100+
protected String table;
101+
protected String hdfsSitePath;
102+
protected Map<String, String> hadoopConfProps;
103+
protected String hadoopConfPath;
104+
105+
public PaimonConfig(ReadonlyConfig readonlyConfig) {
106+
this.catalogName = checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME));
107+
this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE));
108+
this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE));
109+
this.table = checkArgumentNotNull(readonlyConfig.get(TABLE));
110+
this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
111+
this.hadoopConfProps = readonlyConfig.get(HADOOP_CONF);
112+
this.hadoopConfPath = readonlyConfig.get(HADOOP_CONF_PATH);
113+
}
114+
115+
protected <T> T checkArgumentNotNull(T argument) {
116+
checkNotNull(argument);
117+
return argument;
118+
}
119+
120+
@VisibleForTesting
121+
public static List<String> stringToList(String value, String regex) {
122+
if (value == null || value.isEmpty()) {
123+
return ImmutableList.of();
124+
}
125+
return Arrays.stream(value.split(regex)).map(String::trim).collect(toList());
126+
}
69127
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.paimon.config;
19+
20+
import org.apache.hadoop.conf.Configuration;
21+
22+
import java.io.Serializable;
23+
24+
/** Can serializable */
25+
public class PaimonHadoopConfiguration extends Configuration implements Serializable {}

0 commit comments

Comments
 (0)