Skip to content

Commit 26a7807

Browse files
committed
feat: kafka enhance support kerberos
1 parent a5032e8 commit 26a7807

File tree

6 files changed

+438
-6
lines changed

6 files changed

+438
-6
lines changed

connectors/kafka-enhanced-connector/src/main/java/io/tapdata/kafka/KafkaConfig.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.tapdata.kafka.constants.KafkaConcurrentReadMode;
1111
import io.tapdata.kafka.constants.KafkaSchemaMode;
1212
import io.tapdata.kafka.constants.KafkaSerialization;
13+
import io.tapdata.kafka.utils.Krb5Util;
1314
import io.tapdata.pdk.apis.context.TapConnectionContext;
1415
import org.apache.commons.lang3.StringUtils;
1516
import org.apache.kafka.clients.CommonClientConfigs;
@@ -35,8 +36,11 @@ public class KafkaConfig extends BasicConfig implements
3536
IConnectionACL,
3637
ConnectionExtParams {
3738

38-
public KafkaConfig(TapConnectionContext context) {
39+
private final String connectorId;
40+
41+
public KafkaConfig(TapConnectionContext context, String connectorId) {
3942
super(context);
43+
this.connectorId = connectorId;
4044
}
4145

4246
// ---------- 连接配置 ----------
@@ -207,6 +211,10 @@ private Properties buildProperties(String type) {
207211
" username='" + getSaslUsername() +
208212
"' password='" + getSaslPassword() + "';");
209213
}
214+
if (useKerberos()) {
215+
String krb5Path = Krb5Util.saveByCatalog("connections-" + connectorId, getKrb5Keytab(), getKrb5Conf(), true);
216+
Krb5Util.updateKafkaConf(getKrb5ServiceName(), getKrb5Principal(), krb5Path, getKrb5Conf(), props);
217+
}
210218
if (useSsl()) {
211219
// ssl.truststore.location=/path/to/kafka.client.truststore.jks
212220
// ssl.truststore.password=truststore_password
@@ -320,7 +328,7 @@ public Properties buildProducerConfig() {
320328

321329
// ---------- 静态方法 ----------
322330

323-
public static KafkaConfig valueOf(TapConnectionContext context) {
324-
return new KafkaConfig(context);
331+
public static KafkaConfig valueOf(TapConnectionContext context, String connectorId) {
332+
return new KafkaConfig(context, connectorId);
325333
}
326334
}

connectors/kafka-enhanced-connector/src/main/java/io/tapdata/kafka/KafkaEnhancedConnector.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import io.tapdata.kit.EmptyKit;
1010
import io.tapdata.pdk.apis.annotations.TapConnectorClass;
1111
import io.tapdata.pdk.apis.context.TapConnectionContext;
12+
import io.tapdata.pdk.apis.context.TapConnectorContext;
1213
import io.tapdata.pdk.apis.entity.ConnectionOptions;
1314
import io.tapdata.pdk.apis.entity.TestItem;
1415
import io.tapdata.pdk.apis.functions.ConnectorFunctions;
@@ -37,15 +38,18 @@ public class KafkaEnhancedConnector extends ConnectorBase {
3738
@Override
3839
public void onStart(TapConnectionContext connectionContext) throws Throwable {
3940
connectionContext.getLog().info("Starting {}", PDK_ID);
40-
kafkaConfig = KafkaConfig.valueOf(connectionContext);
41-
kafkaService = new KafkaService(kafkaConfig, stopping);
4241
isConnectorStarted(connectionContext, connectorContext -> {
4342
String firstConnectorId = (String) connectorContext.getStateMap().get("firstConnectorId");
4443
if (EmptyKit.isNull(firstConnectorId)) {
4544
firstConnectorId = UUID.randomUUID().toString().replace("-", "");
4645
connectorContext.getStateMap().put("firstConnectorId", firstConnectorId);
4746
}
47+
kafkaConfig = KafkaConfig.valueOf(connectionContext, firstConnectorId);
4848
});
49+
if (!(connectionContext instanceof TapConnectorContext)) {
50+
kafkaConfig = KafkaConfig.valueOf(connectionContext, "");
51+
}
52+
kafkaService = new KafkaService(kafkaConfig, stopping);
4953
}
5054

5155
@Override

connectors/kafka-enhanced-connector/src/main/java/io/tapdata/kafka/KafkaTester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
*/
1818
public class KafkaTester extends Tester<KafkaConfig> {
1919
public KafkaTester(TapConnectionContext connectionContext, Consumer<TestItem> consumer) {
20-
super(KafkaConfig.valueOf(connectionContext), consumer);
20+
super(KafkaConfig.valueOf(connectionContext, ""), consumer);
2121
}
2222

2323
@Override

connectors/kafka-enhanced-connector/src/main/java/io/tapdata/kafka/config/IConnectionSecurity.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public interface IConnectionSecurity extends IConfigWithContext {
1414
String KEY_SASL_USERNAME = "saslUsername";
1515
String KEY_SASL_PASSWORD = "saslPassword";
1616
String KEY_USE_SSL = "useSsl";
17+
String KEY_USE_KERBEROS = "krb5";
1718

1819
enum Protocol {
1920
PLAINTEXT, // 没有任何 authentication
@@ -70,6 +71,26 @@ default boolean useSasl() {
7071
return connectionConfigGet(KEY_USE_SASL, false);
7172
}
7273

74+
default boolean useKerberos() {
75+
return connectionConfigGet(KEY_USE_KERBEROS, false);
76+
}
77+
78+
default String getKrb5Keytab() {
79+
return connectionConfigGet("krb5Keytab", "");
80+
}
81+
82+
default String getKrb5Conf() {
83+
return connectionConfigGet("krb5Conf", "");
84+
}
85+
86+
default String getKrb5Principal() {
87+
return connectionConfigGet("krb5Principal", "");
88+
}
89+
90+
default String getKrb5ServiceName() {
91+
return connectionConfigGet("krb5ServiceName", "");
92+
}
93+
7394
default String getSaslMechanism() {
7495
return connectionConfigGet(KEY_SASL_MECHANISM, "PLAIN");
7596
}

0 commit comments

Comments
 (0)