Skip to content

Commit aa856d1

Browse files
authored
Load correct trusted certificates in Kafka Connect (#11971)
Signed-off-by: Jakub Scholz <www@scholzj.com>
1 parent 6eadcc9 commit aa856d1

9 files changed

Lines changed: 236 additions & 131 deletions

File tree

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConnectConfigurationBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public KafkaConnectConfigurationBuilder withTls(ClientTls tls, String clusterNam
101101

102102
if (tls.getTrustedCertificates() != null && !tls.getTrustedCertificates().isEmpty()) {
103103
printSectionHeader("TLS / SSL");
104-
String configProviderValue = String.format(PLACEHOLDER_SECRET_TEMPLATE_KUBE_CONFIG_PROVIDER, reconciliation.namespace(), KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName), "*.crt");
104+
String configProviderValue = String.format(PLACEHOLDER_SECRET_TEMPLATE_KUBE_CONFIG_PROVIDER, reconciliation.namespace(), KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName), "ca.crt");
105105
writer.println("ssl.truststore.certificates=" + configProviderValue);
106106
writer.println("ssl.truststore.type=PEM");
107107

@@ -226,7 +226,7 @@ public KafkaConnectConfigurationBuilder withAuthentication(KafkaClientAuthentica
226226

227227
if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) {
228228
String oauthTrustedCertsSecret = KafkaConnectResources.internalOauthTrustedCertsSecretName(clusterName);
229-
String trustStorePath = OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT + oauthTrustedCertsSecret + "/" + oauthTrustedCertsSecret + ".crt";
229+
String trustStorePath = OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT + oauthTrustedCertsSecret + "/ca.crt";
230230
jaasConfig.append(" oauth.ssl.truststore.location=\"" + trustStorePath + "\" oauth.ssl.truststore.type=\"PEM\"");
231231
}
232232

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/AbstractConnectOperator.java

Lines changed: 30 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
1616
import io.fabric8.kubernetes.client.CustomResource;
1717
import io.fabric8.kubernetes.client.KubernetesClient;
18-
import io.strimzi.api.kafka.model.common.CertSecretSource;
19-
import io.strimzi.api.kafka.model.common.ClientTls;
2018
import io.strimzi.api.kafka.model.common.Condition;
2119
import io.strimzi.api.kafka.model.common.ConnectorState;
22-
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
2320
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationOAuth;
2421
import io.strimzi.api.kafka.model.connect.AbstractKafkaConnectSpec;
2522
import io.strimzi.api.kafka.model.connect.KafkaConnectResources;
@@ -78,13 +75,10 @@
7875
import java.io.IOException;
7976
import java.util.ArrayList;
8077
import java.util.HashMap;
81-
import java.util.HashSet;
8278
import java.util.List;
8379
import java.util.Map;
8480
import java.util.Optional;
85-
import java.util.Set;
8681
import java.util.TreeMap;
87-
import java.util.concurrent.ConcurrentHashMap;
8882
import java.util.function.BiFunction;
8983
import java.util.function.Function;
9084
import java.util.stream.Collectors;
@@ -284,38 +278,23 @@ protected Future<ReconcileResult<NetworkPolicy>> connectNetworkPolicy(Reconcilia
284278
* @return Future which completes when the reconciliation is done
285279
*/
286280
protected Future<Void> tlsTrustedCertsSecret(Reconciliation reconciliation, String namespace, KafkaConnectCluster connect) {
287-
ClientTls tls = connect.getTls();
288-
Set<String> secretsToCopy = new HashSet<>();
289-
290-
if (tls != null && tls.getTrustedCertificates() != null) {
291-
secretsToCopy.addAll(tls.getTrustedCertificates().stream().map(CertSecretSource::getSecretName).toList());
292-
}
293-
294-
if (secretsToCopy.isEmpty()) {
281+
if (connect.getTls() != null) {
282+
return ReconcilerUtils.trustedCertificates(reconciliation, secretOperations, connect.getTls().getTrustedCertificates())
283+
.compose(certificates -> {
284+
if (certificates != null) {
285+
return secretOperations.reconcile(
286+
reconciliation,
287+
namespace,
288+
KafkaConnectResources.internalTlsTrustedCertsSecretName(connect.getCluster()),
289+
connect.generateTlsTrustedCertsSecret(Map.of("ca.crt", Util.encodeToBase64(certificates)), KafkaConnectResources.internalTlsTrustedCertsSecretName(connect.getCluster())))
290+
.mapEmpty();
291+
} else {
292+
return Future.succeededFuture();
293+
}
294+
});
295+
} else {
295296
return Future.succeededFuture();
296297
}
297-
298-
ConcurrentHashMap<String, String> secretData = new ConcurrentHashMap<>();
299-
return Future.join(secretsToCopy.stream()
300-
.map(secretName -> secretOperations.getAsync(namespace, secretName)
301-
.compose(secret -> {
302-
if (secret == null) {
303-
return Future.failedFuture("Secret " + secretName + " not found");
304-
} else {
305-
secret.getData().entrySet().stream()
306-
.filter(e -> e.getKey().contains(".crt"))
307-
// In case secrets contain the same key, append the secret name into the key
308-
.forEach(e -> secretData.put(secretName + "-" + e.getKey(), e.getValue()));
309-
}
310-
return Future.succeededFuture();
311-
}))
312-
.collect(Collectors.toList()))
313-
.compose(ignore -> secretOperations.reconcile(
314-
reconciliation,
315-
namespace,
316-
KafkaConnectResources.internalTlsTrustedCertsSecretName(connect.getCluster()),
317-
connect.generateTlsTrustedCertsSecret(secretData, KafkaConnectResources.internalTlsTrustedCertsSecretName(connect.getCluster())))
318-
.mapEmpty());
319298
}
320299

321300
/**
@@ -327,52 +306,22 @@ protected Future<Void> tlsTrustedCertsSecret(Reconciliation reconciliation, Stri
327306
*/
328307
@SuppressWarnings("deprecation") // OAuth authentication is deprecated
329308
protected Future<Void> oauthTrustedCertsSecret(Reconciliation reconciliation, String namespace, KafkaConnectCluster connect) {
330-
KafkaClientAuthentication authentication = connect.getAuthentication();
331-
Set<String> secretsToCopy = new HashSet<>();
332-
333-
if (authentication instanceof KafkaClientAuthenticationOAuth oauth && oauth.getTlsTrustedCertificates() != null) {
334-
secretsToCopy.addAll(oauth.getTlsTrustedCertificates().stream().map(CertSecretSource::getSecretName).toList());
335-
}
336-
337-
if (secretsToCopy.isEmpty()) {
338-
return Future.succeededFuture();
339-
}
340-
341-
List<String> certs = new ArrayList<>();
342-
String oauthSecret = KafkaConnectResources.internalOauthTrustedCertsSecretName(connect.getCluster());
343-
return Future.join(secretsToCopy.stream()
344-
.map(secretName -> secretOperations.getAsync(namespace, secretName)
345-
.compose(secret -> {
346-
if (secret == null) {
347-
return Future.failedFuture("Secret " + secretName + " not found");
348-
} else {
349-
secret.getData().entrySet().stream()
350-
.filter(e -> e.getKey().contains(".crt"))
351-
// In case secrets contain the same key, append the secret name into the key
352-
.forEach(e -> certs.add(e.getValue()));
353-
}
354-
return Future.succeededFuture();
355-
}))
356-
.collect(Collectors.toList()))
357-
.compose(ignore -> secretOperations.reconcile(
358-
reconciliation,
359-
namespace,
360-
oauthSecret,
361-
connect.generateTlsTrustedCertsSecret(Map.of(oauthSecret + ".crt", mergeAndEncodeCerts(certs)), oauthSecret))
362-
.mapEmpty());
363-
}
364-
365-
private String mergeAndEncodeCerts(List<String> certs) {
366-
if (certs.size() > 1) {
367-
String decodedAndMergedCerts = certs.stream()
368-
.map(Util::decodeFromBase64)
369-
.collect(Collectors.joining("\n"));
370-
371-
return Util.encodeToBase64(decodedAndMergedCerts);
372-
} else if (certs.size() < 1) {
373-
return "";
309+
if (connect.getAuthentication() instanceof KafkaClientAuthenticationOAuth oauth) {
310+
return ReconcilerUtils.trustedCertificates(reconciliation, secretOperations, oauth.getTlsTrustedCertificates())
311+
.compose(certificates -> {
312+
if (certificates != null) {
313+
return secretOperations.reconcile(
314+
reconciliation,
315+
namespace,
316+
KafkaConnectResources.internalOauthTrustedCertsSecretName(connect.getCluster()),
317+
connect.generateTlsTrustedCertsSecret(Map.of("ca.crt", Util.encodeToBase64(certificates)), KafkaConnectResources.internalOauthTrustedCertsSecretName(connect.getCluster())))
318+
.mapEmpty();
319+
} else {
320+
return Future.succeededFuture();
321+
}
322+
});
374323
} else {
375-
return certs.get(0);
324+
return Future.succeededFuture();
376325
}
377326
}
378327

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/ReconcilerUtils.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ public static Future<Integer> authTlsHash(SecretOperator secretOperations, Strin
470470
if (certSecretSources == null || certSecretSources.isEmpty()) {
471471
tlsFuture = Future.succeededFuture(0);
472472
} else {
473-
// get all TLS trusted certs, compute hash from each of them, sum hashes
473+
// get all TLS trusted certs, compute hash from them
474474
tlsFuture = Future.join(certSecretSources.stream().map(certSecretSource ->
475475
getCertificateAsync(secretOperations, namespace, certSecretSource)
476476
.compose(cert -> Future.succeededFuture(cert.hashCode()))).collect(Collectors.toList()))
@@ -512,6 +512,34 @@ public static Future<Integer> authTlsHash(SecretOperator secretOperations, Strin
512512
}
513513
}
514514

515+
/**
516+
* Gets trusted certificates from Secrets and merges them into a single String.
517+
*
518+
* @param reconciliation Reconciliation marker
519+
* @param secretOperations Secrets operator
520+
* @param certificateSources List of certificate sources
521+
*
522+
* @return Certificates extracted from the Secrets
523+
*/
524+
public static Future<String> trustedCertificates(Reconciliation reconciliation, SecretOperator secretOperations, List<CertSecretSource> certificateSources) {
525+
if (certificateSources != null && !certificateSources.isEmpty()) {
526+
return Future.join(certificateSources
527+
.stream()
528+
.map(certSecretSource -> ReconcilerUtils.getCertificateAsync(secretOperations, reconciliation.namespace(), certSecretSource))
529+
.toList())
530+
.compose(certificates -> {
531+
if (certificates.list().isEmpty()) {
532+
return Future.succeededFuture();
533+
} else {
534+
return Future.succeededFuture(String.join("\n", certificates.list()));
535+
}
536+
});
537+
} else {
538+
// No trusted certificates to extract.
539+
return Future.succeededFuture();
540+
}
541+
}
542+
515543
private static Future<Integer> addSecretHash(SecretOperator secretOperations, String namespace, GenericSecretSource genericSecretSource) {
516544
if (genericSecretSource != null) {
517545
return secretOperations.getAsync(namespace, genericSecretSource.getSecretName())
@@ -579,12 +607,12 @@ private static Future<String> getCertificateAsync(SecretOperator secretOperator,
579607
.compose(secret -> {
580608
if (certSecretSource.getCertificate() != null) {
581609
return validatedSecret(namespace, certSecretSource.getSecretName(), secret, certSecretSource.getCertificate())
582-
.compose(validatedSecret -> Future.succeededFuture(validatedSecret.getData().get(certSecretSource.getCertificate())));
610+
.compose(validatedSecret -> Future.succeededFuture(Util.decodeFromBase64(validatedSecret.getData().get(certSecretSource.getCertificate()))));
583611
} else if (certSecretSource.getPattern() != null) {
584612
PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + certSecretSource.getPattern());
585613

586614
return validatedSecret(namespace, certSecretSource.getSecretName(), secret)
587-
.compose(validatedSecret -> Future.succeededFuture(validatedSecret.getData().entrySet().stream().filter(e -> matcher.matches(Paths.get(e.getKey()))).map(Map.Entry::getValue).sorted().collect(Collectors.joining())));
615+
.compose(validatedSecret -> Future.succeededFuture(validatedSecret.getData().entrySet().stream().filter(e -> matcher.matches(Paths.get(e.getKey()))).map(e -> Util.decodeFromBase64(e.getValue())).sorted().collect(Collectors.joining("\n"))));
588616
} else {
589617
throw new InvalidResourceException("Certificate source does not contain the certificate or the pattern.");
590618
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ public void testPodSetWithTls() {
464464
// Check config map
465465
ConfigMap configMap = kc.generateConnectConfigMap(new MetricsAndLogging(metricsCM, null));
466466
String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME);
467-
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":*.crt}"));
467+
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":ca.crt}"));
468468
assertThat(connectConfigurations, containsString("ssl.truststore.type=PEM"));
469469
assertThat(connectConfigurations, containsString("security.protocol=SSL"));
470470
assertThat(connectConfigurations, not(containsString("ssl.keystore.")));
@@ -496,7 +496,7 @@ public void testPodSetWithTlsAuth() {
496496
assertThat(connectConfigurations, containsString("ssl.keystore.certificate.chain=${strimzisecrets:namespace/user-secret:user.crt}"));
497497
assertThat(connectConfigurations, containsString("ssl.keystore.type=PEM"));
498498
assertThat(connectConfigurations, containsString("security.protocol=SSL"));
499-
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":*.crt}"));
499+
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":ca.crt}"));
500500
assertThat(connectConfigurations, containsString("ssl.truststore.type=PEM"));
501501
}
502502

@@ -592,7 +592,7 @@ public void testPodSetWithScramSha512AuthAndTLSSameSecret() {
592592
assertThat(connectConfigurations, containsString("sasl.mechanism=SCRAM-SHA-512"));
593593
assertThat(connectConfigurations, containsString("security.protocol=SASL_SSL"));
594594
assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user1\" password=\"${strimzidir:/opt/kafka/connect-password/my-secret:user1.password}\";"));
595-
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":*.crt}"));
595+
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":ca.crt}"));
596596
assertThat(connectConfigurations, containsString("ssl.truststore.type=PEM"));
597597

598598
// Check PodSet
@@ -679,7 +679,7 @@ public void testPodSetWithScramSha256AuthAndTLSSameSecret() {
679679
assertThat(connectConfigurations, containsString("sasl.mechanism=SCRAM-SHA-256"));
680680
assertThat(connectConfigurations, containsString("security.protocol=SASL_SSL"));
681681
assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user1\" password=\"${strimzidir:/opt/kafka/connect-password/my-secret:user1.password}\";"));
682-
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":*.crt}"));
682+
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":ca.crt}"));
683683
assertThat(connectConfigurations, containsString("ssl.truststore.type=PEM"));
684684

685685
// Check PodSet
@@ -766,7 +766,7 @@ public void testPodSetWithPlainAuthAndTLSSameSecret() {
766766
assertThat(connectConfigurations, containsString("security.protocol=SASL_SSL"));
767767
assertThat(connectConfigurations, containsString("sasl.mechanism=PLAIN"));
768768
assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"${strimzidir:/opt/kafka/connect-password/my-secret:user1.password}\";"));
769-
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":*.crt}"));
769+
assertThat(connectConfigurations, containsString("ssl.truststore.certificates=${strimzisecrets:namespace/" + KafkaConnectResources.internalTlsTrustedCertsSecretName(clusterName) + ":ca.crt}"));
770770
assertThat(connectConfigurations, containsString("ssl.truststore.type=PEM"));
771771

772772
// Check PodSet
@@ -2046,7 +2046,7 @@ public void testPodSetWithOAuthWithTls() {
20462046
"oauth.token.endpoint.uri=\"http://my-oauth-server\" " +
20472047
"oauth.ssl.endpoint.identification.algorithm=\"\" " +
20482048
"oauth.client.secret=\"${strimzidir:/opt/kafka/oauth/my-secret-secret:my-secret-key}\" " +
2049-
"oauth.ssl.truststore.location=\"/opt/kafka/oauth-certs/" + oauthSecret + "/" + oauthSecret + ".crt\" " +
2049+
"oauth.ssl.truststore.location=\"/opt/kafka/oauth-certs/" + oauthSecret + "/ca.crt\" " +
20502050
"oauth.ssl.truststore.type=\"PEM\";"));
20512051
assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"));
20522052

0 commit comments

Comments
 (0)