Skip to content

Commit 595cdd1

Browse files
authored
[Improve][Zeta][storage] update hdfs configuration, support more parameters (apache#6547)
1 parent ba1b191 commit 595cdd1

File tree

8 files changed

+107
-16
lines changed

8 files changed

+107
-16
lines changed

Diff for: docs/en/seatunnel-engine/checkpoint-storage.md

+2
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ seatunnel:
144144
// if you used kerberos, you can config like this:
145145
kerberosPrincipal: your-kerberos-principal
146146
kerberosKeytabFilePath: your-kerberos-keytab
147+
// if you need hdfs-site config, you can config like this:
148+
hdfs_site_path: /path/to/your/hdfs_site_path
147149
```
148150

149151
if HDFS is in HA mode , you can config like this:

Diff for: docs/en/seatunnel-engine/deployment.md

+2
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,8 @@ map:
197197
clusterName: seatunnel-cluster
198198
storage.type: hdfs
199199
fs.defaultFS: hdfs://localhost:9000
200+
// if you need hdfs-site config, you can config like this:
201+
hdfs_site_path: /path/to/your/hdfs_site_path
200202
```
201203

202204
If there is no HDFS and your cluster only have one node, you can config to use local file like this:

Diff for: seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
import org.apache.commons.lang3.StringUtils;
2626
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.Path;
2728
import org.apache.hadoop.security.UserGroupInformation;
2829

2930
import java.io.IOException;
@@ -44,11 +45,13 @@ public class HdfsConfiguration extends AbstractConfiguration {
4445

4546
private static final String KERBEROS_KEY = "kerberos";
4647

47-
/** ********* Hdfs constants ************* */
48+
/** ******** Hdfs constants ************* */
4849
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
4950

5051
private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
5152

53+
private static final String HDFS_SITE_PATH = "hdfs_site_path";
54+
5255
private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";
5356

5457
@Override
@@ -71,6 +74,9 @@ public Configuration buildConfiguration(Map<String, String> config)
7174
authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
7275
}
7376
}
77+
if (config.containsKey(HDFS_SITE_PATH)) {
78+
hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
79+
}
7480
// support other hdfs optional config keys
7581
config.entrySet().stream()
7682
.filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
@@ -80,6 +86,7 @@ public Configuration buildConfiguration(Map<String, String> config)
8086
String value = entry.getValue();
8187
hadoopConf.set(key, value);
8288
});
89+
8390
return hadoopConf;
8491
}
8592

Diff for: seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Map;
5050
import java.util.Set;
5151
import java.util.concurrent.TimeUnit;
52+
import java.util.stream.Collectors;
5253

5354
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_FILE_PATH_SPLIT;
5455
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_NAMESPACE;
@@ -125,8 +126,13 @@ public void initialize(Map<String, Object> configuration) {
125126
this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase());
126127
// build configuration
127128
AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration();
129+
Map<String, String> stringMap =
130+
configuration.entrySet().stream()
131+
.collect(
132+
Collectors.toMap(
133+
Map.Entry::getKey, entry -> entry.getValue().toString()));
128134

129-
Configuration hadoopConf = fileConfiguration.buildConfiguration(configuration);
135+
Configuration hadoopConf = fileConfiguration.buildConfiguration(stringMap);
130136
this.conf = hadoopConf;
131137
this.namespace = (String) configuration.getOrDefault(NAMESPACE_KEY, DEFAULT_IMAP_NAMESPACE);
132138
this.businessName = (String) configuration.get(BUSINESS_KEY);

Diff for: seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,15 @@ public void setBlockSize(Long blockSize) {
4646
* @param config configuration
4747
* @param keys keys
4848
*/
49-
void checkConfiguration(Map<String, Object> config, String... keys) {
49+
void checkConfiguration(Map<String, String> config, String... keys) {
5050
for (String key : keys) {
5151
if (!config.containsKey(key) || null == config.get(key)) {
5252
throw new IllegalArgumentException(key + " is required");
5353
}
5454
}
5555
}
5656

57-
public abstract Configuration buildConfiguration(Map<String, Object> config)
57+
public abstract Configuration buildConfiguration(Map<String, String> config)
5858
throws IMapStorageException;
5959

6060
/**
@@ -65,11 +65,11 @@ public abstract Configuration buildConfiguration(Map<String, Object> config)
6565
* @param prefix
6666
*/
6767
void setExtraConfiguration(
68-
Configuration hadoopConf, Map<String, Object> config, String prefix) {
68+
Configuration hadoopConf, Map<String, String> config, String prefix) {
6969
config.forEach(
7070
(k, v) -> {
7171
if (config.containsKey(BLOCK_SIZE)) {
72-
setBlockSize(Long.parseLong(config.get(BLOCK_SIZE).toString()));
72+
setBlockSize(Long.parseLong(config.get(BLOCK_SIZE)));
7373
}
7474
if (k.startsWith(prefix)) {
7575
hadoopConf.set(k, String.valueOf(v));

Diff for: seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java

+79-5
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,95 @@
2020

2121
package org.apache.seatunnel.engine.imap.storage.file.config;
2222

23+
import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
24+
25+
import org.apache.commons.lang3.StringUtils;
2326
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.Path;
28+
import org.apache.hadoop.security.UserGroupInformation;
2429

30+
import java.io.IOException;
2531
import java.util.Map;
2632

27-
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
2833
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
2934

3035
public class HdfsConfiguration extends AbstractConfiguration {
3136

37+
/** hdfs uri is required */
38+
private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
39+
/** hdfs kerberos principal( is optional) */
40+
private static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
41+
42+
private static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
43+
private static final String HADOOP_SECURITY_AUTHENTICATION_KEY =
44+
"hadoop.security.authentication";
45+
46+
private static final String KERBEROS_KEY = "kerberos";
47+
48+
/** ******** Hdfs constants ************* */
49+
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
50+
51+
private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
52+
53+
private static final String HDFS_SITE_PATH = "hdfs_site_path";
54+
55+
private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";
56+
3257
@Override
33-
public Configuration buildConfiguration(Map<String, Object> config) {
58+
public Configuration buildConfiguration(Map<String, String> config) {
3459
Configuration hadoopConf = new Configuration();
35-
hadoopConf.set(
36-
FS_DEFAULT_NAME_KEY,
37-
String.valueOf(config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT)));
60+
if (config.containsKey(HDFS_DEF_FS_NAME)) {
61+
hadoopConf.set(HDFS_DEF_FS_NAME, config.get(HDFS_DEF_FS_NAME));
62+
}
63+
hadoopConf.set(HDFS_IMPL_KEY, HDFS_IMPL);
64+
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(FS_DEFAULT_NAME_KEY));
65+
if (config.containsKey(KERBEROS_PRINCIPAL)
66+
&& config.containsKey(KERBEROS_KEYTAB_FILE_PATH)) {
67+
String kerberosPrincipal = config.get(KERBEROS_PRINCIPAL);
68+
String kerberosKeytabFilePath = config.get(KERBEROS_KEYTAB_FILE_PATH);
69+
if (StringUtils.isNotBlank(kerberosPrincipal)
70+
&& StringUtils.isNotBlank(kerberosKeytabFilePath)) {
71+
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, KERBEROS_KEY);
72+
authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
73+
}
74+
}
75+
if (config.containsKey(HDFS_SITE_PATH)) {
76+
hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
77+
}
78+
// support other hdfs optional config keys
79+
config.entrySet().stream()
80+
.filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
81+
.forEach(
82+
entry -> {
83+
String key = entry.getKey().replace(SEATUNNEL_HADOOP_PREFIX, "");
84+
String value = entry.getValue();
85+
hadoopConf.set(key, value);
86+
});
87+
3888
return hadoopConf;
3989
}
90+
91+
/**
92+
* Authenticate kerberos
93+
*
94+
* @param kerberosPrincipal kerberos principal
95+
* @param kerberosKeytabFilePath kerberos keytab file path
96+
* @param hdfsConf hdfs configuration
97+
* @throws IMapStorageException authentication exception
98+
*/
99+
private void authenticateKerberos(
100+
String kerberosPrincipal, String kerberosKeytabFilePath, Configuration hdfsConf)
101+
throws IMapStorageException {
102+
UserGroupInformation.setConfiguration(hdfsConf);
103+
try {
104+
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
105+
} catch (IOException e) {
106+
throw new IMapStorageException(
107+
"Failed to login user from keytab : "
108+
+ kerberosKeytabFilePath
109+
+ " and kerberos principal : "
110+
+ kerberosPrincipal,
111+
e);
112+
}
113+
}
40114
}

Diff for: seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ public class OssConfiguration extends AbstractConfiguration {
3636
private static final String OSS_KEY = "fs.oss.";
3737

3838
@Override
39-
public Configuration buildConfiguration(Map<String, Object> config)
39+
public Configuration buildConfiguration(Map<String, String> config)
4040
throws IMapStorageException {
4141
checkConfiguration(config, OSS_BUCKET_KEY);
4242
Configuration hadoopConf = new Configuration();
43-
hadoopConf.set(FS_DEFAULT_NAME_KEY, String.valueOf(config.get(OSS_BUCKET_KEY)));
43+
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY));
4444
hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL);
4545
setExtraConfiguration(hadoopConf, config, OSS_KEY);
4646
return hadoopConf;

Diff for: seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,16 @@ public class S3Configuration extends AbstractConfiguration {
3939
private static final String FS_KEY = "fs.";
4040

4141
@Override
42-
public Configuration buildConfiguration(Map<String, Object> config)
42+
public Configuration buildConfiguration(Map<String, String> config)
4343
throws IMapStorageException {
4444
checkConfiguration(config, S3_BUCKET_KEY);
4545
String protocol = DEFAULT_PROTOCOL;
46-
if (config.get(S3_BUCKET_KEY).toString().startsWith(S3A_PROTOCOL)) {
46+
if (config.get(S3_BUCKET_KEY).startsWith(S3A_PROTOCOL)) {
4747
protocol = S3A_PROTOCOL;
4848
}
4949
String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL;
5050
Configuration hadoopConf = new Configuration();
51-
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY).toString());
51+
hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY));
5252
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
5353
setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + SPLIT_CHAR);
5454
return hadoopConf;

0 commit comments

Comments
 (0)