Skip to content

Commit 287b8c8

Browse files
authored
[Feature][ClickhouseFile] Support add publicKey to identity (#8351)
1 parent eb0bf89 commit 287b8c8

File tree

10 files changed

+75
-15
lines changed

10 files changed

+75
-15
lines changed

Diff for: docs/en/connector-v2/sink/ClickhouseFile.md

+5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ Write data to Clickhouse can also be done using JDBC
3838
| compatible_mode | boolean | no | false |
3939
| file_fields_delimiter | string | no | "\t" |
4040
| file_temp_path | string | no | "/tmp/seatunnel/clickhouse-local/file" |
41+
| key_path | string | no | "/tmp/id_rsa" |
4142
| common-options | | no | - |
4243

4344
### host [string]
@@ -111,6 +112,10 @@ Avoid this with this configuration. Value string has to be an exactly one charac
111112

112113
The directory where ClickhouseFile stores temporary files locally.
113114

115+
### key_path [string]
116+
117+
The path of the private key file used for scp or rsync to connect to the ClickHouse server.
118+
114119
### common options
115120

116121
Sink plugin common parameters, please refer to [Sink Common Options](../sink-common-options.md) for details

Diff for: docs/zh/connector-v2/sink/ClickhouseFile.md

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
## 接收器选项
2020

21-
| 名称 | 类型 | 是否必须 | 默认值 |
21+
| 名称 | 类型 | 是否必须 | 默认值 |
2222
|------------------------|---------|------|----------------------------------------|
2323
| host | string | yes | - |
2424
| database | string | yes | - |
@@ -36,6 +36,7 @@
3636
| compatible_mode | boolean | no | false |
3737
| file_fields_delimiter | string | no | "\t" |
3838
| file_temp_path | string | no | "/tmp/seatunnel/clickhouse-local/file" |
39+
| key_path | string | no | "/tmp/id_rsa" |
3940
| common-options | | no | - |
4041

4142
### host [string]
@@ -102,6 +103,10 @@ ClickHouseFile使用CSV格式来临时保存数据。但如果数据中包含CSV
102103

103104
ClickhouseFile本地存储临时文件的目录。
104105

106+
### key_path [string]
107+
108+
用于scp或rsync传输文件的私钥路径。
109+
105110
### common options
106111

107112
Sink插件常用参数,请参考[Sink常用选项](../sink-common-options.md)获取更多细节信息。

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java

+6
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,12 @@ public class ClickhouseConfig {
167167
.defaultValue(Collections.emptyMap())
168168
.withDescription("Clickhouse custom config");
169169

170+
public static final Option<String> KEY_PATH =
171+
Options.key("key_path")
172+
.stringType()
173+
.noDefaultValue()
174+
.withDescription("The path of rsync/ssh key file");
175+
170176
public static final Option<String> FILE_FIELDS_DELIMITER =
171177
Options.key("file_fields_delimiter")
172178
.stringType()

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/FileReaderOption.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public class FileReaderOption implements Serializable {
4141
private boolean compatibleMode;
4242
private String fileTempPath;
4343
private String fileFieldsDelimiter;
44+
private String keyPath;
4445

4546
public FileReaderOption(
4647
ShardMetadata shardMetadata,
@@ -53,7 +54,8 @@ public FileReaderOption(
5354
Map<String, String> nodePassword,
5455
boolean compatibleMode,
5556
String fileTempPath,
56-
String fileFieldsDelimiter) {
57+
String fileFieldsDelimiter,
58+
String keyPath) {
5759
this.shardMetadata = shardMetadata;
5860
this.tableSchema = tableSchema;
5961
this.fields = fields;
@@ -65,5 +67,6 @@ public FileReaderOption(
6567
this.compatibleMode = compatibleMode;
6668
this.fileFieldsDelimiter = fileFieldsDelimiter;
6769
this.fileTempPath = fileTempPath;
70+
this.keyPath = keyPath;
6871
}
6972
}

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
6363
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
6464
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
65+
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.KEY_PATH;
6566
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_ADDRESS;
6667
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD;
6768
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
@@ -108,6 +109,7 @@ public void prepare(Config config) throws PrepareFailException {
108109
.put(COMPATIBLE_MODE.key(), COMPATIBLE_MODE.defaultValue())
109110
.put(FILE_TEMP_PATH.key(), FILE_TEMP_PATH.defaultValue())
110111
.put(FILE_FIELDS_DELIMITER.key(), FILE_FIELDS_DELIMITER.defaultValue())
112+
.put(KEY_PATH.key(), KEY_PATH.defaultValue())
111113
.build();
112114

113115
config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
@@ -184,7 +186,8 @@ public void prepare(Config config) throws PrepareFailException {
184186
nodePassword,
185187
config.getBoolean(COMPATIBLE_MODE.key()),
186188
config.getString(FILE_TEMP_PATH.key()),
187-
config.getString(FILE_FIELDS_DELIMITER.key()));
189+
config.getString(FILE_FIELDS_DELIMITER.key()),
190+
config.getString(KEY_PATH.key()));
188191
}
189192

190193
@Override

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkFactory.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_FIELDS_DELIMITER;
3131
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FILE_TEMP_PATH;
3232
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
33+
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.KEY_PATH;
3334
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_FREE_PASSWORD;
3435
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.NODE_PASS;
3536
import static org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.PASSWORD;
@@ -55,7 +56,8 @@ public OptionRule optionRule() {
5556
NODE_PASS,
5657
COMPATIBLE_MODE,
5758
FILE_FIELDS_DELIMITER,
58-
FILE_TEMP_PATH)
59+
FILE_TEMP_PATH,
60+
KEY_PATH)
5961
.build();
6062
}
6163
}

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -384,9 +384,10 @@ private void moveClickhouseLocalFileToServer(Shard shard, List<String> clickhous
384384
String hostAddress = shard.getNode().getHost();
385385
String user = readerOption.getNodeUser().getOrDefault(hostAddress, "root");
386386
String password = readerOption.getNodePassword().getOrDefault(hostAddress, null);
387+
String keyPath = readerOption.getKeyPath();
387388
FileTransfer fileTransfer =
388389
FileTransferFactory.createFileTransfer(
389-
this.readerOption.getCopyMethod(), hostAddress, user, password);
390+
this.readerOption.getCopyMethod(), hostAddress, user, password, keyPath);
390391
fileTransfer.init();
391392
int randomPath = threadLocalRandom.nextInt(shardLocalDataPaths.get(shard).size());
392393
fileTransfer.transferAndChown(

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransferFactory.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,16 @@
2323

2424
public class FileTransferFactory {
2525
public static FileTransfer createFileTransfer(
26-
ClickhouseFileCopyMethod type, String host, String user, String password) {
26+
ClickhouseFileCopyMethod type,
27+
String host,
28+
String user,
29+
String password,
30+
String keyPath) {
2731
switch (type) {
2832
case SCP:
29-
return new ScpFileTransfer(host, user, password);
33+
return new ScpFileTransfer(host, user, password, keyPath);
3034
case RSYNC:
31-
return new RsyncFileTransfer(host, user, password);
35+
return new RsyncFileTransfer(host, user, password, keyPath);
3236
default:
3337
throw new ClickhouseConnectorException(
3438
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java

+22-4
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,18 @@
2525
import org.apache.commons.lang3.StringUtils;
2626
import org.apache.sshd.client.SshClient;
2727
import org.apache.sshd.client.session.ClientSession;
28+
import org.apache.sshd.common.keyprovider.FileKeyPairProvider;
29+
import org.apache.sshd.common.keyprovider.KeyPairProvider;
2830

2931
import lombok.extern.slf4j.Slf4j;
3032

3133
import java.io.BufferedReader;
3234
import java.io.IOException;
3335
import java.io.InputStream;
3436
import java.io.InputStreamReader;
37+
import java.nio.file.Paths;
38+
import java.security.GeneralSecurityException;
39+
import java.security.KeyPair;
3540
import java.util.ArrayList;
3641
import java.util.List;
3742

@@ -43,14 +48,16 @@ public class RsyncFileTransfer implements FileTransfer {
4348
private final String host;
4449
private final String user;
4550
private final String password;
51+
private final String keyPath;
4652

4753
private ClientSession clientSession;
4854
private SshClient sshClient;
4955

50-
public RsyncFileTransfer(String host, String user, String password) {
56+
public RsyncFileTransfer(String host, String user, String password, String keyPath) {
5157
this.host = host;
5258
this.user = user;
5359
this.password = password;
60+
this.keyPath = keyPath;
5461
}
5562

5663
@Override
@@ -62,13 +69,19 @@ public void init() {
6269
if (password != null) {
6370
clientSession.addPasswordIdentity(password);
6471
}
65-
// TODO support add publicKey to identity
72+
if (keyPath != null) {
73+
FileKeyPairProvider fileKeyPairProvider =
74+
new FileKeyPairProvider(Paths.get(keyPath));
75+
KeyPair fileKeyPair =
76+
fileKeyPairProvider.loadKey(clientSession, KeyPairProvider.SSH_RSA);
77+
clientSession.addPublicKeyIdentity(fileKeyPair);
78+
}
6679
if (!clientSession.auth().verify().isSuccess()) {
6780
throw new ClickhouseConnectorException(
6881
ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
6982
"ssh host " + host + "authentication failed");
7083
}
71-
} catch (IOException e) {
84+
} catch (IOException | GeneralSecurityException e) {
7285
throw new ClickhouseConnectorException(
7386
ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
7487
"Failed to connect to host: " + host + " by user: " + user + " on port 22",
@@ -84,7 +97,12 @@ public void transferAndChown(String sourcePath, String targetPath) {
8497
? String.format(
8598
"'sshpass -p %s ssh -o StrictHostKeyChecking=no -p %s'",
8699
password, SSH_PORT)
87-
: String.format("'ssh -o StrictHostKeyChecking=no -p %s'", SSH_PORT);
100+
: keyPath != null
101+
? String.format(
102+
"'ssh -i %s -o StrictHostKeyChecking=no -p %s'",
103+
keyPath, SSH_PORT)
104+
: String.format(
105+
"'ssh -o StrictHostKeyChecking=no -p %s'", SSH_PORT);
88106
List<String> rsyncCommand = new ArrayList<>();
89107
rsyncCommand.add("rsync");
90108
// recursive with -r

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java

+16-3
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,17 @@
2525
import org.apache.commons.lang3.StringUtils;
2626
import org.apache.sshd.client.SshClient;
2727
import org.apache.sshd.client.session.ClientSession;
28+
import org.apache.sshd.common.keyprovider.FileKeyPairProvider;
29+
import org.apache.sshd.common.keyprovider.KeyPairProvider;
2830
import org.apache.sshd.scp.client.ScpClient;
2931
import org.apache.sshd.scp.client.ScpClientCreator;
3032

3133
import lombok.extern.slf4j.Slf4j;
3234

3335
import java.io.IOException;
36+
import java.nio.file.Paths;
37+
import java.security.GeneralSecurityException;
38+
import java.security.KeyPair;
3439
import java.util.ArrayList;
3540
import java.util.List;
3641

@@ -42,15 +47,17 @@ public class ScpFileTransfer implements FileTransfer {
4247
private final String host;
4348
private final String user;
4449
private final String password;
50+
private final String keyPath;
4551

4652
private ScpClient scpClient;
4753
private ClientSession clientSession;
4854
private SshClient sshClient;
4955

50-
public ScpFileTransfer(String host, String user, String password) {
56+
public ScpFileTransfer(String host, String user, String password, String keyPath) {
5157
this.host = host;
5258
this.user = user;
5359
this.password = password;
60+
this.keyPath = keyPath;
5461
}
5562

5663
@Override
@@ -62,14 +69,20 @@ public void init() {
6269
if (password != null) {
6370
clientSession.addPasswordIdentity(password);
6471
}
65-
// TODO support add publicKey to identity
72+
if (keyPath != null) {
73+
FileKeyPairProvider fileKeyPairProvider =
74+
new FileKeyPairProvider(Paths.get(keyPath));
75+
KeyPair fileKeyPair =
76+
fileKeyPairProvider.loadKey(clientSession, KeyPairProvider.SSH_RSA);
77+
clientSession.addPublicKeyIdentity(fileKeyPair);
78+
}
6679
if (!clientSession.auth().verify().isSuccess()) {
6780
throw new ClickhouseConnectorException(
6881
ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
6982
"ssh host " + host + "authentication failed");
7083
}
7184
scpClient = ScpClientCreator.instance().createScpClient(clientSession);
72-
} catch (IOException e) {
85+
} catch (IOException | GeneralSecurityException e) {
7386
throw new ClickhouseConnectorException(
7487
ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
7588
"Failed to connect to host: " + host + " by user: " + user + " on port 22",

0 commit comments

Comments
 (0)