File tree 7 files changed +97
-0
lines changed
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources
7 files changed +97
-0
lines changed Original file line number Diff line number Diff line change @@ -246,6 +246,49 @@ sink {
246
246
}
247
247
```
248
248
249
+ ### Single table(Specify hadoop HA config with hadoop_user_name)
250
+
251
+ ``` hocon
252
+ env {
253
+ parallelism = 1
254
+ job.mode = "STREAMING"
255
+ checkpoint.interval = 5000
256
+ }
257
+
258
+ source {
259
+ Mysql-CDC {
260
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
261
+ username = "root"
262
+ password = "******"
263
+ table-names = ["seatunnel.role"]
264
+ }
265
+ }
266
+
267
+ transform {
268
+ }
269
+
270
+ sink {
271
+ Paimon {
272
+ catalog_name="seatunnel_test"
273
+ warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
274
+ database="seatunnel"
275
+ table="role"
276
+ paimon.hadoop.conf = {
277
+ hadoop_user_name = "hdfs"
278
+ fs.defaultFS = "hdfs://nameservice1"
279
+ dfs.nameservices = "nameservice1"
280
+ dfs.ha.namenodes.nameservice1 = "nn1,nn2"
281
+ dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
282
+ dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
283
+ dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
284
+ dfs.client.use.datanode.hostname = "true"
285
+ security.kerberos.login.principal = "your-kerberos-principal"
286
+ security.kerberos.login.keytab = "your-kerberos-keytab-path"
287
+ }
288
+ }
289
+ }
290
+ ```
291
+
249
292
### Single table(Hive catalog)
250
293
251
294
``` hocon
Original file line number Diff line number Diff line change @@ -152,6 +152,7 @@ source {
152
152
table="st_test"
153
153
query = "select * from st_test where pk_id is not null and pk_id < 3"
154
154
paimon.hadoop.conf = {
155
+ hadoop_user_name = "hdfs"
155
156
fs.defaultFS = "hdfs://nameservice1"
156
157
dfs.nameservices = "nameservice1"
157
158
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
Original file line number Diff line number Diff line change @@ -244,6 +244,49 @@ sink {
244
244
}
245
245
```
246
246
247
+ ### 单表(指定hadoop HA配置和指定hadoop用户名)
248
+
249
+ ``` hocon
250
+ env {
251
+ parallelism = 1
252
+ job.mode = "STREAMING"
253
+ checkpoint.interval = 5000
254
+ }
255
+
256
+ source {
257
+ Mysql-CDC {
258
+ base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
259
+ username = "root"
260
+ password = "******"
261
+ table-names = ["seatunnel.role"]
262
+ }
263
+ }
264
+
265
+ transform {
266
+ }
267
+
268
+ sink {
269
+ Paimon {
270
+ catalog_name="seatunnel_test"
271
+ warehouse="hdfs:///tmp/seatunnel/paimon/hadoop-sink/"
272
+ database="seatunnel"
273
+ table="role"
274
+ paimon.hadoop.conf = {
275
+ hadoop_user_name = "hdfs"
276
+ fs.defaultFS = "hdfs://nameservice1"
277
+ dfs.nameservices = "nameservice1"
278
+ dfs.ha.namenodes.nameservice1 = "nn1,nn2"
279
+ dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
280
+ dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
281
+ dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
282
+ dfs.client.use.datanode.hostname = "true"
283
+ security.kerberos.login.principal = "your-kerberos-principal"
284
+ security.kerberos.login.keytab = "your-kerberos-keytab-path"
285
+ }
286
+ }
287
+ }
288
+ ```
289
+
247
290
### 单表(使用Hive catalog)
248
291
249
292
``` hocon
Original file line number Diff line number Diff line change @@ -154,6 +154,7 @@ source {
154
154
table="st_test"
155
155
query = "select * from st_test where pk_id is not null and pk_id < 3"
156
156
paimon.hadoop.conf = {
157
+ hadoop_user_name = "hdfs"
157
158
fs.defaultFS = "hdfs://nameservice1"
158
159
dfs.nameservices = "nameservice1"
159
160
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
Original file line number Diff line number Diff line change 25
25
26
26
import org .apache .commons .lang3 .StringUtils ;
27
27
import org .apache .hadoop .conf .Configuration ;
28
+ import org .apache .hadoop .security .UserGroupInformation ;
28
29
import org .apache .paimon .catalog .Catalog ;
29
30
import org .apache .paimon .catalog .CatalogContext ;
30
31
import org .apache .paimon .catalog .CatalogFactory ;
@@ -50,6 +51,8 @@ public class PaimonCatalogLoader implements Serializable {
50
51
51
52
private static final String HDFS_IMPL_KEY = "fs.hdfs.impl" ;
52
53
54
+ private static final String HADOOP_USER_NAME = "hadoop_user_name" ;
55
+
53
56
private String warehouse ;
54
57
private PaimonCatalogEnum catalogType ;
55
58
private String catalogUri ;
@@ -72,6 +75,10 @@ public Catalog loadCatalog() {
72
75
if (warehouse .startsWith (HDFS_PREFIX )) {
73
76
checkConfiguration (paimonHadoopConfiguration , HDFS_DEF_FS_NAME );
74
77
paimonHadoopConfiguration .set (HDFS_IMPL_KEY , HDFS_IMPL );
78
+ String username = paimonHadoopConfiguration .get (HADOOP_USER_NAME );
79
+ if (StringUtils .isNotBlank (username )) {
80
+ UserGroupInformation .setLoginUser (UserGroupInformation .createRemoteUser (username ));
81
+ }
75
82
} else if (warehouse .startsWith (S3A_PREFIX )) {
76
83
optionsMap .putAll (paimonHadoopConfiguration .getPropsWithPrefix (StringUtils .EMPTY ));
77
84
}
Original file line number Diff line number Diff line change 68
68
database = "seatunnel_namespace11"
69
69
table = "st_test"
70
70
paimon.hadoop.conf = {
71
+ hadoop_user_name = "hdfs"
71
72
fs.defaultFS = "hdfs://nameservice1"
72
73
dfs.nameservices = "nameservice1"
73
74
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
Original file line number Diff line number Diff line change 53
53
table = "st_test"
54
54
data_save_mode=DROP_DATA
55
55
paimon.hadoop.conf = {
56
+ hadoop_user_name = "hdfs"
56
57
fs.defaultFS = "hdfs://nameservice1"
57
58
dfs.nameservices = "nameservice1"
58
59
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
You can’t perform that action at this time.
0 commit comments