Skip to content

Commit a06c913

Browse files
authored
Add support for type: custom client authentication (#11760)
Signed-off-by: Jakub Scholz <www@scholzj.com>
1 parent bfe975f commit a06c913

20 files changed

Lines changed: 561 additions & 35 deletions

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* Make properties `broker.session.timeout.ms`, `broker.heartbeat.interval.ms` and `controller.socket.timeout.ms` configurable
88
* Add monitoring of custom resources using [kubernetes-state-metrics (KSM)](https://github.com/kubernetes/kube-state-metrics) (see [Strimzi proposal 087](https://github.com/strimzi/proposals/blob/main/087-monitoring-of-custom-resources.md))
99
* Ignore users (their ACLs, Quotas and SCRAM-SHA-512 credentials) managed by some other tools based on a configurable pattern in User Operator
10+
* Support for `type: custom` client authentication (to make it easier to use custom authentication mechanisms such as AWS IAM)
1011
* Added support for Strimzi Metrics Reporter to Kafka Connect, Mirror Maker 2 and Kafka Bridge.
1112
* Add new feature gate `ServerSideApplyPhase1` (disabled by default) that adds support for Server Side Apply for `ConfigMap`, `Ingress`, `PVC`, `Service`, and `ServiceAccount` according to [Strimzi Proposal #105](https://github.com/strimzi/proposals/blob/main/105-server-side-apply-implementation-fg-timelines.md).
1213
* Added distinction between changes of "cluster-wide" broker properties applied dynamically at cluster level, and "per-broker" broker properties applied dynamically at broker level.

api/src/main/java/io/strimzi/api/kafka/model/common/authentication/KafkaClientAuthentication.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,21 @@
2727
@JsonSubTypes.Type(name = KafkaClientAuthenticationScramSha512.TYPE_SCRAM_SHA_512, value = KafkaClientAuthenticationScramSha512.class),
2828
@JsonSubTypes.Type(name = KafkaClientAuthenticationPlain.TYPE_PLAIN, value = KafkaClientAuthenticationPlain.class),
2929
@JsonSubTypes.Type(name = KafkaClientAuthenticationOAuth.TYPE_OAUTH, value = KafkaClientAuthenticationOAuth.class),
30+
@JsonSubTypes.Type(name = KafkaClientAuthenticationCustom.TYPE_CUSTOM, value = KafkaClientAuthenticationCustom.class),
3031
})
3132
@JsonInclude(JsonInclude.Include.NON_NULL)
3233
@EqualsAndHashCode
3334
@ToString
3435
public abstract class KafkaClientAuthentication implements UnknownPropertyPreserving {
3536
private Map<String, Object> additionalProperties;
3637

37-
@Description("Authentication type. " +
38-
"Currently the supported types are `tls`, `scram-sha-256`, `scram-sha-512`, `plain`, and 'oauth'. " +
39-
"`scram-sha-256` and `scram-sha-512` types use SASL SCRAM-SHA-256 and SASL SCRAM-SHA-512 Authentication, respectively. " +
40-
"`plain` type uses SASL PLAIN Authentication. " +
41-
"`oauth` type uses SASL OAUTHBEARER Authentication. " +
42-
"The `tls` type uses TLS Client Authentication. " +
43-
"The `tls` type is supported only over TLS connections.")
38+
@Description("Specifies the authentication type. " +
39+
"Supported types are `tls`, `scram-sha-256`, `scram-sha-512`, `plain`, 'oauth', and `custom`. " +
40+
"`tls` uses TLS client authentication and is supported only over TLS connections. " +
41+
"`scram-sha-256` and `scram-sha-512` use SASL SCRAM-SHA-256 and SASL SCRAM-SHA-512 authentication, respectively. " +
42+
"`plain` uses SASL PLAIN authentication. " +
43+
"`oauth` uses SASL OAUTHBEARER authentication. " +
44+
"`custom` allows you to configure a custom authentication mechanism.")
4445
public abstract String getType();
4546

4647
@Override
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.api.kafka.model.common.authentication;
6+
7+
import com.fasterxml.jackson.annotation.JsonInclude;
8+
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
9+
import io.strimzi.api.kafka.model.common.Constants;
10+
import io.strimzi.crdgenerator.annotations.Description;
11+
import io.strimzi.crdgenerator.annotations.DescriptionFile;
12+
import io.sundr.builder.annotations.Buildable;
13+
import lombok.EqualsAndHashCode;
14+
import lombok.ToString;
15+
16+
import java.util.Map;
17+
18+
/**
19+
* Configures the Kafka client authentication using a custom mechanism in client based components
20+
*/
21+
@DescriptionFile
22+
@Buildable(
23+
editableEnabled = false,
24+
builderPackage = Constants.FABRIC8_KUBERNETES_API
25+
)
26+
@JsonInclude(JsonInclude.Include.NON_NULL)
27+
@JsonPropertyOrder({"type", "sasl", "config"})
28+
@EqualsAndHashCode(callSuper = true)
29+
@ToString(callSuper = true)
30+
public class KafkaClientAuthenticationCustom extends KafkaClientAuthentication {
31+
public static final String ALLOWED_PREFIXES = "ssl.keystore., sasl.";
32+
public static final String TYPE_CUSTOM = "custom";
33+
34+
private Map<String, Object> config;
35+
private boolean sasl;
36+
37+
@Description("Must be `" + TYPE_CUSTOM + "`")
38+
@JsonInclude(JsonInclude.Include.NON_NULL)
39+
@Override
40+
public String getType() {
41+
return TYPE_CUSTOM;
42+
}
43+
44+
@Description("Enable or disable SASL on this authentication mechanism.")
45+
@JsonInclude(JsonInclude.Include.NON_NULL)
46+
public boolean isSasl() {
47+
return sasl;
48+
}
49+
50+
public void setSasl(boolean enabled) {
51+
this.sasl = enabled;
52+
}
53+
54+
@Description("Configuration for the custom authentication mechanism. " +
55+
"Only properties with the `sasl.` and `ssl.keystore.` prefixes are allowed. " +
56+
"Specify other options in the regular configuration section of the custom resource.")
57+
@JsonInclude(JsonInclude.Include.NON_NULL)
58+
public Map<String, Object> getConfig() {
59+
return config;
60+
}
61+
62+
public void setConfig(Map<String, Object> config) {
63+
this.config = config;
64+
}
65+
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/AbstractConfiguration.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,19 @@ public AbstractConfiguration(AbstractConfiguration configuration) {
3333
options.addMapPairs(configuration.asOrderedProperties().asMap());
3434
}
3535

36+
/**
37+
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
38+
* ConfigMap / CRD.
39+
*
40+
* @param reconciliation The reconciliation
41+
* @param jsonOptions Json object with configuration options as key ad value pairs.
42+
* @param allowedPrefixes List with configuration key prefixes which are allowed. All other keys will be ignored.
43+
*/
44+
public AbstractConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<String> allowedPrefixes) {
45+
options.addIterablePairs(jsonOptions);
46+
filterAllowed(reconciliation, allowedPrefixes);
47+
}
48+
3649
/**
3750
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
3851
* ConfigMap / CRD.
@@ -71,7 +84,7 @@ public AbstractConfiguration(Reconciliation reconciliation, String configuration
7184
}
7285

7386
/**
74-
* Filters forbidden values from the configuration.
87+
* Filters forbidden values from the configuration based on a list of forbidden prefixes.
7588
*
7689
* @param reconciliation The reconciliation
7790
* @param forbiddenPrefixes List with configuration key prefixes which are not allowed. All keys which start
@@ -114,6 +127,27 @@ private void filterForbidden(Reconciliation reconciliation, List<String> forbidd
114127
}
115128
}
116129

130+
/**
131+
* Filters forbidden values from the configuration based on a list of allowed prefixes.
132+
*
133+
* @param reconciliation The reconciliation
134+
* @param allowedPrefixes List with configuration key prefixes which are allowed. All other keys will be ignored.
135+
*/
136+
private void filterAllowed(Reconciliation reconciliation, List<String> allowedPrefixes) {
137+
options.filter(k -> {
138+
boolean allowed = allowedPrefixes.stream().anyMatch(p -> k.toLowerCase(Locale.ENGLISH).startsWith(p));
139+
140+
if (allowed) {
141+
LOGGER.traceCr(reconciliation, "Configuration option \"{}\" is allowed and will be passed to the assembly", k);
142+
} else {
143+
LOGGER.warnCr(reconciliation, "Configuration option \"{}\" is not allowed and will be ignored", k);
144+
}
145+
146+
// We have to return the reverted value to filter out the not allowed options
147+
return !allowed;
148+
});
149+
}
150+
117151
/**
118152
* Returns a value for a specific config option
119153
*

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilder.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.strimzi.api.kafka.model.common.GenericSecretSource;
1313
import io.strimzi.api.kafka.model.common.PasswordSecretSource;
1414
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
15+
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationCustom;
1516
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth;
1617
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlain;
1718
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScram;
@@ -27,6 +28,7 @@
2728

2829
import java.io.PrintWriter;
2930
import java.io.StringWriter;
31+
import java.util.Map;
3032
import java.util.stream.Collectors;
3133

3234
import static io.strimzi.api.kafka.model.common.metrics.StrimziMetricsReporter.TYPE_STRIMZI_METRICS_REPORTER;
@@ -39,6 +41,7 @@
3941
* configuration file. This class is using the builder pattern to make it easy to test the different parts etc. To
4042
* generate the configuration file, it is using the PrintWriter.
4143
*/
44+
@SuppressWarnings("checkstyle:CyclomaticComplexity")
4245
public class KafkaBridgeConfigurationBuilder {
4346

4447
// placeholders expanded through config providers inside the bridge node
@@ -149,8 +152,22 @@ public KafkaBridgeConfigurationBuilder withAuthentication(KafkaClientAuthenticat
149152
writer.println("kafka.ssl.keystore.location=/tmp/strimzi/bridge.keystore.p12");
150153
writer.println("kafka.ssl.keystore.password=" + PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR);
151154
writer.println("kafka.ssl.keystore.type=PKCS12");
152-
// otherwise SASL or OAuth is going to be used for authentication
153-
} else {
155+
} else if (authentication instanceof KafkaClientAuthenticationCustom customAuth) { // Configure custom authentication
156+
if (customAuth.isSasl()) {
157+
// If this authentication uses SASL, we need to update the security protocol to combine the SASL
158+
// flag with the SSL or PLAINTEXT flag.
159+
securityProtocol = securityProtocol.equals("SSL") ? "SASL_SSL" : "SASL_PLAINTEXT";
160+
}
161+
162+
Map<String, Object> customConfig = customAuth.getConfig();
163+
if (customConfig == null) {
164+
customConfig = Map.of();
165+
}
166+
167+
KafkaClientAuthenticationCustomConfiguration config = new KafkaClientAuthenticationCustomConfiguration(reconciliation, customConfig.entrySet());
168+
config.asOrderedProperties().asMap().forEach((key, value) -> writer.println(String.format("kafka.%s=%s", key, value)));
169+
writer.println();
170+
} else { // otherwise SASL or OAuth is going to be used for authentication
154171
securityProtocol = securityProtocol.equals("SSL") ? "SASL_SSL" : "SASL_PLAINTEXT";
155172
String saslMechanism = null;
156173
StringBuilder jaasConfig = new StringBuilder();
@@ -211,6 +228,7 @@ public KafkaBridgeConfigurationBuilder withAuthentication(KafkaClientAuthenticat
211228
writer.println();
212229
}
213230
}
231+
214232
return this;
215233
}
216234

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
6+
package io.strimzi.operator.cluster.model;
7+
8+
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationCustom;
9+
import io.strimzi.operator.common.Reconciliation;
10+
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
/**
15+
* Class for handling configuration of the custom client-side authentication
16+
*/
17+
public class KafkaClientAuthenticationCustomConfiguration extends AbstractConfiguration {
18+
private static final List<String> ALLOWED_PREFIXES;
19+
static {
20+
ALLOWED_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClientAuthenticationCustom.ALLOWED_PREFIXES);
21+
}
22+
23+
/**
24+
* Constructor
25+
*
26+
* @param reconciliation Reconciliation marker
27+
* @param jsonOptions Configuration options
28+
*/
29+
public KafkaClientAuthenticationCustomConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
30+
super(reconciliation, jsonOptions, ALLOWED_PREFIXES);
31+
}
32+
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectConfigurationBuilder.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.strimzi.api.kafka.model.common.GenericSecretSource;
99
import io.strimzi.api.kafka.model.common.PasswordSecretSource;
1010
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
11+
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationCustom;
1112
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth;
1213
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlain;
1314
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScram;
@@ -24,6 +25,7 @@
2425
import java.io.StringWriter;
2526
import java.util.ArrayList;
2627
import java.util.List;
28+
import java.util.Map;
2729
import java.util.stream.Collectors;
2830

2931
import static io.strimzi.operator.cluster.model.KafkaConnectCluster.OAUTH_SECRETS_BASE_VOLUME_MOUNT;
@@ -36,6 +38,7 @@
3638
* configuration file. This class is using the builder pattern to make it easy to test the different parts etc. To
3739
* generate the configuration file, it is using the PrintWriter.
3840
*/
41+
@SuppressWarnings("checkstyle:CyclomaticComplexity")
3942
public class KafkaConnectConfigurationBuilder {
4043
// the volume mounted secret file template includes: <volume_mount>/<secret_name>/<secret_key>
4144
private static final String PLACEHOLDER_VOLUME_MOUNTED_SECRET_TEMPLATE_CONFIG_PROVIDER_DIR = "${strimzidir:%s%s:%s}";
@@ -148,8 +151,28 @@ public KafkaConnectConfigurationBuilder withAuthentication(KafkaClientAuthentica
148151
writer.println("admin.ssl.keystore.certificate.chain=" + certConfigProviderValue);
149152
writer.println("admin.ssl.keystore.key=" + keyConfigProviderValue);
150153
writer.println("admin.ssl.keystore.type=PEM");
151-
// otherwise SASL or OAuth is going to be used for authentication
152-
} else {
154+
} else if (authentication instanceof KafkaClientAuthenticationCustom customAuth) { // Configure custom authentication
155+
if (customAuth.isSasl()) {
156+
// If this authentication uses SASL, we need to update the security protocol to combine the SASL
157+
// flag with the SSL or PLAINTEXT flag.
158+
securityProtocol = securityProtocol.equals("SSL") ? "SASL_SSL" : "SASL_PLAINTEXT";
159+
}
160+
161+
Map<String, Object> customConfig = customAuth.getConfig();
162+
if (customConfig == null) {
163+
customConfig = Map.of();
164+
}
165+
166+
KafkaClientAuthenticationCustomConfiguration config = new KafkaClientAuthenticationCustomConfiguration(reconciliation, customConfig.entrySet());
167+
config.asOrderedProperties().asMap().forEach((key, value) -> {
168+
writer.println(String.format("%s=%s", key, value));
169+
writer.println(String.format("producer.%s=%s", key, value));
170+
writer.println(String.format("consumer.%s=%s", key, value));
171+
writer.println(String.format("admin.%s=%s", key, value));
172+
});
173+
174+
writer.println();
175+
} else { // otherwise SASL or OAuth is going to be used for authentication
153176
securityProtocol = securityProtocol.equals("SSL") ? "SASL_SSL" : "SASL_PLAINTEXT";
154177
String saslMechanism = null;
155178
StringBuilder jaasConfig = new StringBuilder();
@@ -232,6 +255,7 @@ public KafkaConnectConfigurationBuilder withAuthentication(KafkaClientAuthentica
232255
writer.println();
233256
}
234257
}
258+
235259
return this;
236260
}
237261

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55
package io.strimzi.operator.cluster.model;
66

7+
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationCustom;
78
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth;
89
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationPlain;
910
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScram;
@@ -180,8 +181,8 @@ public List<KafkaConnector> generateConnectorDefinitions() {
180181
Map<String, Object> config = new HashMap<>(connector.getConfig());
181182

182183
// Source and target cluster configurations
183-
addClusterToMirrorMaker2ConnectorConfig(config, targetCluster, TARGET_CLUSTER_PREFIX);
184-
addClusterToMirrorMaker2ConnectorConfig(config, sourceCluster, SOURCE_CLUSTER_PREFIX);
184+
addClusterToMirrorMaker2ConnectorConfig(reconciliation, config, targetCluster, TARGET_CLUSTER_PREFIX);
185+
addClusterToMirrorMaker2ConnectorConfig(reconciliation, config, sourceCluster, SOURCE_CLUSTER_PREFIX);
185186

186187
// Topics pattern
187188
if (mirror.getTopicsPattern() != null) {
@@ -251,7 +252,7 @@ public List<KafkaConnector> generateConnectorDefinitions() {
251252
return config;
252253
}
253254

254-
/* test */ static void addClusterToMirrorMaker2ConnectorConfig(Map<String, Object> config, KafkaMirrorMaker2ClusterSpec cluster, String configPrefix) {
255+
/* test */ static void addClusterToMirrorMaker2ConnectorConfig(Reconciliation reconciliation, Map<String, Object> config, KafkaMirrorMaker2ClusterSpec cluster, String configPrefix) {
255256
config.put(configPrefix + "alias", cluster.getAlias());
256257
config.put(configPrefix + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
257258

@@ -284,6 +285,20 @@ public List<KafkaConnector> generateConnectorDefinitions() {
284285
config.put(configPrefix + SaslConfigs.SASL_JAAS_CONFIG,
285286
oauthJaasConfig(cluster, oauthAuthentication));
286287
config.put(configPrefix + SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler");
288+
} else if (cluster.getAuthentication() instanceof KafkaClientAuthenticationCustom customAuth) { // Configure custom authentication
289+
if (customAuth.isSasl()) {
290+
// If this authentication uses SASL, we need to update the security protocol to combine the SASL
291+
// flag with the SSL or PLAINTEXT flag.
292+
securityProtocol = securityProtocol.equals("SSL") ? "SASL_SSL" : "SASL_PLAINTEXT";
293+
}
294+
295+
Map<String, Object> customConfig = customAuth.getConfig();
296+
if (customConfig == null) {
297+
customConfig = Map.of();
298+
}
299+
300+
KafkaClientAuthenticationCustomConfiguration authConfig = new KafkaClientAuthenticationCustomConfiguration(reconciliation, customConfig.entrySet());
301+
authConfig.asOrderedProperties().asMap().forEach((key, value) -> config.put(configPrefix + key, value));
287302
}
288303
}
289304

0 commit comments

Comments
 (0)