Skip to content

Commit 3749136

Browse files
committed
feat(schema-connect): add SSL support for KafkaConnect
1 parent 1857335 commit 3749136

File tree

7 files changed

+101
-37
lines changed

7 files changed

+101
-37
lines changed

extension-rest-client/src/main/java/io/streamthoughts/jikkou/http/client/SslConfigSupport.java

+24-8
Original file line numberDiff line numberDiff line change
@@ -9,56 +9,72 @@
99
import io.streamthoughts.jikkou.core.config.ConfigProperty;
1010
import io.streamthoughts.jikkou.core.config.Configuration;
1111
import io.streamthoughts.jikkou.http.client.ssl.SSLConfig;
12+
import java.util.Optional;
13+
import org.jetbrains.annotations.NotNull;
1214

1315
public interface SslConfigSupport {
1416

17+
String SSL_KEY_STORE_LOCATION = "sslKeyStoreLocation";
18+
String SSL_KEY_STORE_TYPE = "sslKeyStoreType";
19+
String SSL_KEY_STORE_PASSWORD = "sslKeyStorePassword";
20+
String SSL_KEY_PASSWORD = "sslKeyPassword";
21+
String SSL_TRUST_STORE_LOCATION = "sslTrustStoreLocation";
22+
String SSL_TRUST_STORE_PASSWORD = "sslTrustStorePassword";
23+
String SSL_TRUST_STORE_TYPE = "sslTrustStoreType";
24+
String SSL_IGNORE_HOSTNAME_VERIFICATION = "sslIgnoreHostnameVerification";
25+
1526
static ConfigProperty<String> sslKeyStoreLocation(final String configNamespace) {
1627
return ConfigProperty
17-
.ofString(configNamespace + ".sslKeyStoreLocation")
28+
.ofString(prefixWithNamespace(configNamespace, SSL_KEY_STORE_LOCATION))
1829
.description("The location of the key store file.");
1930
}
2031

32+
private static @NotNull String prefixWithNamespace(final String configNamespace,
33+
final String sslKeyStoreLocation) {
34+
return Optional.ofNullable(configNamespace).map(s -> s + ".").orElse("") + sslKeyStoreLocation;
35+
}
36+
2137
static ConfigProperty<String> sslKeyStoreType(final String configNamespace) {
2238
return ConfigProperty
23-
.ofString(configNamespace + ".sslKeyStoreType")
39+
.ofString(prefixWithNamespace(configNamespace, SSL_KEY_STORE_TYPE))
2440
.description("The file format of the key store file.")
2541
.orElse("PKCS12");
2642
}
2743

2844
static ConfigProperty<String> sslKeyStorePassword(final String configNamespace) {
2945
return ConfigProperty
30-
.ofString(configNamespace + ".sslKeyStorePassword")
46+
.ofString(prefixWithNamespace(configNamespace, SSL_KEY_STORE_PASSWORD))
3147
.description("The password for the key store file.");
3248
}
3349

3450
static ConfigProperty<String> sslKeyPassword(final String configNamespace) {
3551
return ConfigProperty
36-
.ofString(configNamespace + ".sslKeyPassword")
52+
.ofString(prefixWithNamespace(configNamespace, SSL_KEY_PASSWORD))
3753
.description("The password of the private key in the key store file.");
3854
}
3955

4056
static ConfigProperty<String> sslTrustStoreLocation(final String configNamespace) {
4157
return ConfigProperty
42-
.ofString(configNamespace + ".sslTrustStoreLocation")
58+
.ofString(prefixWithNamespace(configNamespace, SSL_TRUST_STORE_LOCATION))
4359
.description("The location of the trust store file.");
4460
}
4561

4662
static ConfigProperty<String> sslTrustStoreType(final String configNamespace) {
4763
return ConfigProperty
48-
.ofString(configNamespace + ".sslTrustStoreType")
64+
.ofString(prefixWithNamespace(configNamespace, SSL_TRUST_STORE_TYPE))
4965
.description("The file format of the trust store file.")
5066
.orElse("PKCS12");
5167
}
5268

5369
static ConfigProperty<String> sslTrustStorePassword(final String configNamespace) {
5470
return ConfigProperty
55-
.ofString(configNamespace + ".sslTrustStorePassword")
71+
.ofString(prefixWithNamespace(configNamespace, SSL_TRUST_STORE_PASSWORD))
5672
.description("The password for the trust store file.");
5773
}
5874

5975
static ConfigProperty<Boolean> sslIgnoreHostnameVerification(final String configNamespace) {
6076
return ConfigProperty
61-
.ofBoolean(configNamespace + ".sslIgnoreHostnameVerification")
77+
.ofBoolean(prefixWithNamespace(configNamespace, SSL_IGNORE_HOSTNAME_VERIFICATION))
6278
.description("Specifies whether to ignore the hostname verification.")
6379
.orElse(false);
6480
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
* Copyright (c) The original authors
4+
*
5+
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
package io.streamthoughts.jikkou.http.client;
8+
9+
import io.streamthoughts.jikkou.core.config.Configuration;
10+
import org.junit.jupiter.api.Test;
11+
12+
class SslConfigSupportTest {
13+
14+
@Test
15+
void shouldCreateSslConfigWithoutNamespace() {
16+
SslConfigSupport.getSslConfig(null, Configuration
17+
.builder()
18+
.with(SslConfigSupport.SSL_KEY_STORE_PASSWORD, "password")
19+
.with(SslConfigSupport.SSL_KEY_STORE_LOCATION, "/tmp/keystore.jks")
20+
.with(SslConfigSupport.SSL_KEY_STORE_TYPE, "jks")
21+
.with(SslConfigSupport.SSL_TRUST_STORE_LOCATION, "/tmp/truststore.jks")
22+
.with(SslConfigSupport.SSL_TRUST_STORE_PASSWORD, "password")
23+
.with(SslConfigSupport.SSL_TRUST_STORE_TYPE, "jks")
24+
.build()
25+
);
26+
}
27+
}

providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/AuthMethod.java

+2-12
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,9 @@
66
*/
77
package io.streamthoughts.jikkou.kafka.connect.api;
88

9-
import java.util.Arrays;
10-
import java.util.Locale;
11-
import org.jetbrains.annotations.NotNull;
12-
139
public enum AuthMethod {
1410
INVALID,
1511
BASICAUTH,
16-
NONE;
17-
18-
public static AuthMethod getForNameIgnoreCase(final @NotNull String str) {
19-
return Arrays.stream(AuthMethod.values())
20-
.filter(e -> e.name().equals(str.toUpperCase(Locale.ROOT)))
21-
.findFirst()
22-
.orElse(AuthMethod.INVALID);
23-
}
12+
NONE,
13+
SSL
2414
}

providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectApiFactory.java

+4
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ public static KafkaConnectApi create(@NotNull KafkaConnectClientConfig config,
6060
builder.header("Authorization", buildAuthorizationHeader);
6161
yield builder;
6262
}
63+
case SSL -> {
64+
builder.sslConfig(config.getSslConfig());
65+
yield builder;
66+
}
6367
case NONE -> builder;
6468

6569
case INVALID -> throw new IllegalStateException("Unexpected value: " + config.getAuthMethod());

providers/jikkou-provider-kafka-connect/src/main/java/io/streamthoughts/jikkou/kafka/connect/api/KafkaConnectClientConfig.java

+28-17
Original file line numberDiff line numberDiff line change
@@ -6,37 +6,40 @@
66
*/
77
package io.streamthoughts.jikkou.kafka.connect.api;
88

9+
import io.streamthoughts.jikkou.common.utils.Enums;
910
import io.streamthoughts.jikkou.core.config.ConfigProperty;
1011
import io.streamthoughts.jikkou.core.config.Configuration;
12+
import io.streamthoughts.jikkou.http.client.SslConfigSupport;
13+
import io.streamthoughts.jikkou.http.client.ssl.SSLConfig;
1114
import java.util.Objects;
1215

1316
public class KafkaConnectClientConfig {
1417

1518
public static final ConfigProperty<String> KAFKA_CONNECT_NAME = ConfigProperty
16-
.ofString("name")
17-
.description("Name of the kafka connect cluster.");
19+
.ofString("name")
20+
.description("Name of the kafka connect cluster.");
1821

1922
public static final ConfigProperty<String> KAFKA_CONNECT_URL = ConfigProperty
20-
.ofString("url")
21-
.description("URL to establish connection to kafka connect cluster.");
23+
.ofString("url")
24+
.description("URL to establish connection to kafka connect cluster.");
2225

2326
public static final ConfigProperty<String> KAFKA_CONNECT_AUTH_METHOD = ConfigProperty
24-
.ofString("authMethod")
25-
.orElse(AuthMethod.NONE.name())
26-
.description("Method to use for authenticating on Kafka Connect cluster. Available values are: [none, basicauth]");
27+
.ofString("authMethod")
28+
.orElse(AuthMethod.NONE.name())
29+
.description("Method to use for authenticating on Kafka Connect cluster. Available values are: [none, basicauth, ssl]");
2730

2831
public static final ConfigProperty<String> KAFKA_CONNECT_BASIC_AUTH_USERNAME = ConfigProperty
29-
.ofString("basicAuthUser")
30-
.description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the username for Authorization Basic header");
32+
.ofString("basicAuthUser")
33+
.description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the username for Authorization Basic header");
3134

3235
public static final ConfigProperty<String> KAFKA_CONNECT_BASIC_AUTH_PASSWORD = ConfigProperty
33-
.ofString("basicAuthPassword")
34-
.description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the password for Authorization Basic header");
36+
.ofString("basicAuthPassword")
37+
.description("Use when 'kafkaConnect.authMethod' is 'basicauth' to specify the password for Authorization Basic header");
3538

3639
public static final ConfigProperty<Boolean> KAFKA_CONNECT_DEBUG_LOGGING_ENABLED = ConfigProperty
37-
.ofBoolean("debugLoggingEnabled")
38-
.description("Enable debug logging.")
39-
.orElse(false);
40+
.ofBoolean("debugLoggingEnabled")
41+
.description("Enable debug logging.")
42+
.orElse(false);
4043

4144
private final Configuration configuration;
4245

@@ -58,7 +61,7 @@ public String getConnectUrl() {
5861
}
5962

6063
public AuthMethod getAuthMethod() {
61-
return AuthMethod.getForNameIgnoreCase(KAFKA_CONNECT_AUTH_METHOD.get(configuration));
64+
return Enums.getForNameIgnoreCase(KAFKA_CONNECT_AUTH_METHOD.get(configuration), AuthMethod.class, AuthMethod.INVALID);
6265
}
6366

6467
public String getBasicAuthUsername() {
@@ -77,7 +80,13 @@ public boolean getDebugLoggingEnabled() {
7780
return KAFKA_CONNECT_DEBUG_LOGGING_ENABLED.get(configuration);
7881
}
7982

80-
/** {@inheritDoc} **/
83+
public SSLConfig getSslConfig() {
84+
return SslConfigSupport.getSslConfig(null, configuration);
85+
}
86+
87+
/**
88+
* {@inheritDoc}
89+
**/
8190
@Override
8291
public boolean equals(Object o) {
8392
if (this == o) return true;
@@ -86,7 +95,9 @@ public boolean equals(Object o) {
8695
return Objects.equals(configuration, that.configuration);
8796
}
8897

89-
/** {@inheritDoc} **/
98+
/**
99+
* {@inheritDoc}
100+
**/
90101
@Override
91102
public int hashCode() {
92103
return Objects.hash(configuration);

server/jikkou-api-server/src/main/resources/application.yaml

+14
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ jikkou:
6565
#<provider_name>.enabled: <boolean>
6666
core.enabled: true
6767
kafka.enabled: true
68+
aiven.enabled: true
6869

6970
kafka:
7071
client:
@@ -76,6 +77,19 @@ jikkou:
7677
# The default custom transformations to apply on any resources.
7778
transformations: [ ]
7879

80+
aiven:
81+
# Aiven project name
82+
project: "fhussonnois-9f81"
83+
# Aiven service name
84+
service: kafka-demo
85+
# URL to the Aiven REST API.
86+
apiUrl: "https://api.aiven.io/v1/"
87+
# Aiven Bearer Token. Tokens can be obtained from your Aiven profile page
88+
tokenAuth: "HP8Y7IBZGs3/03rK/LsLaORnXugDULl9vlQHk8p3mMFwtsZgELDX8ejx9zu18hn4LSFm4UTuIbYHWZ1EeO20bcwG0u6NrwP9xytB0kE4LLYzLe9B9NiJkvDEb+ZRiT4vnP52TZUA3QlRDOOOE7LmfI9/VDAUHj91DHzy/dr2DagtHl09jWtzmpOpKh34up7EDg6vFag+k6bntcmaLzTHgMK5kCz373dsl/8+0L3CTdRsVapTkIUeEiL8aCowxTzwUYIGCg6aOZW98oIn8HfF+j1W+qwlL4VlMbHDEUDX6+9yhfj6lb7BR9A82hWnoy+IsiNKVCV3oupTvtk16fFvWEx91LoZ3WElUY/bDgWdQRfyV/xZ"
89+
# Enable debug logging
90+
debugLoggingEnabled: false
91+
92+
7993
# The default custom validations to apply on any resources.
8094
validations:
8195
- name: "topicMustHaveValidName"

up

+2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ set -e
55
BASEDIR=$(dirname "$(readlink -f $0)")
66
DOCKERDIR="$BASEDIR"
77

8+
if [ -z "$DOCKER_STACK" ]; then
89
DOCKER_STACK="$DOCKERDIR/docker-compose.yml"
10+
fi
911

1012
line() {
1113
echo -e "\n----------------------------------------------------------------------------------------------------------------------------------------\n"

0 commit comments

Comments
 (0)