Skip to content

Commit df3985e

Browse files
authored
Fix wrong escaping of OAuth configuration in Mirror Maker 2 (#9799)
Signed-off-by: Jakub Scholz <[email protected]>
1 parent 73dba1b commit df3985e

File tree

3 files changed

+201
-3
lines changed

3 files changed

+201
-3
lines changed

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2Connectors.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -282,8 +282,8 @@ private static String oauthJaasConfig(KafkaMirrorMaker2ClusterSpec cluster, Kafk
282282
}
283283

284284
if (oauth.getTlsTrustedCertificates() != null && !oauth.getTlsTrustedCertificates().isEmpty()) {
285-
jaasOptions.put("oauth.ssl.truststore.location", "/tmp/kafka/clusters/\"" + cluster.getAlias() + "\"-oauth.truststore.p12");
286-
jaasOptions.put("oauth.ssl.truststore.password", "\"${file:" + CONNECTORS_CONFIG_FILE + ":oauth.ssl.truststore.password}\"");
285+
jaasOptions.put("oauth.ssl.truststore.location", "/tmp/kafka/clusters/" + cluster.getAlias() + "-oauth.truststore.p12");
286+
jaasOptions.put("oauth.ssl.truststore.password", "${file:" + CONNECTORS_CONFIG_FILE + ":oauth.ssl.truststore.password}");
287287
jaasOptions.put("oauth.ssl.truststore.type", "PKCS12");
288288
}
289289

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package io.strimzi.operator.cluster.model;
66

77
import io.strimzi.api.kafka.model.common.CertAndKeySecretSourceBuilder;
8+
import io.strimzi.api.kafka.model.common.CertSecretSourceBuilder;
89
import io.strimzi.api.kafka.model.common.ConnectorState;
910
import io.strimzi.api.kafka.model.connector.KafkaConnector;
1011
import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2;
@@ -615,6 +616,7 @@ public void testAddClusterToMirrorMaker2ConnectorConfigWithOauth() {
615616
.withKey("refreshTokenKey")
616617
.withSecretName("refreshTokenSecretName")
617618
.endRefreshToken()
619+
.withTlsTrustedCertificates(new CertSecretSourceBuilder().withCertificate("ca.crt").withSecretName("my-oauth-secret").build())
618620
.endKafkaClientAuthenticationOAuth()
619621
.build();
620622

@@ -627,7 +629,10 @@ public void testAddClusterToMirrorMaker2ConnectorConfigWithOauth() {
627629
is(Map.of("oauth.client.secret", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.client.secret}",
628630
"oauth.access.token", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.access.token}",
629631
"oauth.refresh.token", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.refresh.token}",
630-
"oauth.password.grant.password", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.password.grant.password}")));
632+
"oauth.password.grant.password", "${file:/tmp/strimzi-mirrormaker2-connector.properties:sourceClusterAlias.oauth.password.grant.password}",
633+
"oauth.ssl.truststore.location", "/tmp/kafka/clusters/sourceClusterAlias-oauth.truststore.p12",
634+
"oauth.ssl.truststore.type", "PKCS12",
635+
"oauth.ssl.truststore.password", "${file:/tmp/strimzi-mirrormaker2-connector.properties:oauth.ssl.truststore.password}")));
631636

632637
assertThat(config,
633638
is(Map.of("prefix.alias", "sourceClusterAlias",

systemtest/src/test/java/io/strimzi/systemtest/security/oauth/OauthTlsST.java

+193
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder;
1414
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
1515
import io.strimzi.api.kafka.model.mirrormaker.KafkaMirrorMakerResources;
16+
import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2;
17+
import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2ClusterSpec;
18+
import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2ClusterSpecBuilder;
1619
import io.strimzi.operator.common.model.Labels;
1720
import io.strimzi.systemtest.Environment;
1821
import io.strimzi.systemtest.TestConstants;
@@ -31,6 +34,7 @@
3134
import io.strimzi.systemtest.storage.TestStorage;
3235
import io.strimzi.systemtest.templates.crd.KafkaBridgeTemplates;
3336
import io.strimzi.systemtest.templates.crd.KafkaConnectTemplates;
37+
import io.strimzi.systemtest.templates.crd.KafkaMirrorMaker2Templates;
3438
import io.strimzi.systemtest.templates.crd.KafkaMirrorMakerTemplates;
3539
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
3640
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
@@ -41,21 +45,29 @@
4145
import io.strimzi.systemtest.utils.kafkaUtils.KafkaConnectUtils;
4246
import io.strimzi.systemtest.utils.kafkaUtils.KafkaConnectorUtils;
4347
import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils;
48+
import io.strimzi.systemtest.utils.kubeUtils.controllers.JobUtils;
49+
import io.strimzi.test.TestUtils;
50+
import io.strimzi.test.WaitException;
51+
import io.strimzi.test.k8s.KubeClusterResource;
4452
import io.vertx.core.cli.annotations.Description;
4553
import org.apache.kafka.clients.consumer.ConsumerConfig;
4654
import org.apache.kafka.clients.producer.ProducerConfig;
55+
import org.apache.logging.log4j.Level;
4756
import org.apache.logging.log4j.LogManager;
4857
import org.apache.logging.log4j.Logger;
4958
import org.junit.jupiter.api.BeforeAll;
5059
import org.junit.jupiter.api.Tag;
5160

61+
import java.time.Duration;
62+
5263
import static io.strimzi.systemtest.TestConstants.ACCEPTANCE;
5364
import static io.strimzi.systemtest.TestConstants.ARM64_UNSUPPORTED;
5465
import static io.strimzi.systemtest.TestConstants.BRIDGE;
5566
import static io.strimzi.systemtest.TestConstants.CONNECT;
5667
import static io.strimzi.systemtest.TestConstants.CONNECT_COMPONENTS;
5768
import static io.strimzi.systemtest.TestConstants.HTTP_BRIDGE_DEFAULT_PORT;
5869
import static io.strimzi.systemtest.TestConstants.MIRROR_MAKER;
70+
import static io.strimzi.systemtest.TestConstants.MIRROR_MAKER2;
5971
import static io.strimzi.systemtest.TestConstants.NODEPORT_SUPPORTED;
6072
import static io.strimzi.systemtest.TestConstants.OAUTH;
6173
import static io.strimzi.systemtest.TestConstants.REGRESSION;
@@ -412,6 +424,187 @@ void testMirrorMaker() {
412424
ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount());
413425
}
414426

427+
@Description("As a OAuth MirrorMaker 2, I am able to replicate Topic data using using encrypted communication")
428+
@IsolatedTest("Using more tha one Kafka cluster in one Namespace")
429+
@Tag(MIRROR_MAKER2)
430+
@Tag(NODEPORT_SUPPORTED)
431+
@SuppressWarnings({"checkstyle:MethodLength"})
432+
void testMirrorMaker2() {
433+
// Nodeport needs cluster wide rights to work properly which is not possible with STRIMZI_RBAC_SCOPE=NAMESPACE
434+
assumeFalse(Environment.isNamespaceRbacScope());
435+
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
436+
437+
String producerName = OAUTH_PRODUCER_NAME + "-" + testStorage.getClusterName();
438+
String consumerName = OAUTH_CONSUMER_NAME + "-" + testStorage.getClusterName();
439+
440+
resourceManager.createResourceWithWait(KafkaTopicTemplates.topic(oauthClusterName, testStorage.getTopicName(), Environment.TEST_SUITE_NAMESPACE).build());
441+
442+
KafkaOauthClients oauthExampleClients = new KafkaOauthClientsBuilder()
443+
.withNamespaceName(Environment.TEST_SUITE_NAMESPACE)
444+
.withProducerName(producerName)
445+
.withConsumerName(consumerName)
446+
.withBootstrapAddress(KafkaResources.tlsBootstrapAddress(oauthClusterName))
447+
.withTopicName(testStorage.getTopicName())
448+
.withMessageCount(testStorage.getMessageCount())
449+
.withOauthClientId(OAUTH_CLIENT_NAME)
450+
.withOauthClientSecret(OAUTH_CLIENT_SECRET)
451+
.withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri())
452+
.build();
453+
454+
resourceManager.createResourceWithWait(oauthExampleClients.producerStrimziOauthTls(oauthClusterName));
455+
ClientUtils.waitForClientSuccess(producerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount());
456+
457+
resourceManager.createResourceWithWait(oauthExampleClients.consumerStrimziOauthTls(oauthClusterName));
458+
ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount());
459+
460+
String targetKafkaCluster = oauthClusterName + "-target";
461+
String kafkaSourceClusterName = oauthClusterName;
462+
463+
resourceManager.createResourceWithWait(
464+
NodePoolsConverter.convertNodePoolsIfNeeded(
465+
KafkaNodePoolTemplates.brokerPool(testStorage.getNamespaceName(), testStorage.getTargetBrokerPoolName(), targetKafkaCluster, 1).build(),
466+
KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getTargetControllerPoolName(), targetKafkaCluster, 1).build()
467+
)
468+
);
469+
resourceManager.createResourceWithWait(KafkaTemplates.kafkaEphemeral(targetKafkaCluster, 1, 1)
470+
.editMetadata()
471+
.withNamespace(Environment.TEST_SUITE_NAMESPACE)
472+
.endMetadata()
473+
.editSpec()
474+
.editKafka()
475+
.withListeners(OauthAbstractST.BUILD_OAUTH_TLS_LISTENER.apply(keycloakInstance),
476+
new GenericKafkaListenerBuilder()
477+
.withName(TestConstants.EXTERNAL_LISTENER_DEFAULT_NAME)
478+
.withPort(9094)
479+
.withType(KafkaListenerType.NODEPORT)
480+
.withTls(true)
481+
.withNewKafkaListenerAuthenticationOAuth()
482+
.withValidIssuerUri(keycloakInstance.getValidIssuerUri())
483+
.withJwksExpirySeconds(keycloakInstance.getJwksExpireSeconds())
484+
.withJwksRefreshSeconds(keycloakInstance.getJwksRefreshSeconds())
485+
.withJwksEndpointUri(keycloakInstance.getJwksEndpointUri())
486+
.withUserNameClaim(keycloakInstance.getUserNameClaim())
487+
.withTlsTrustedCertificates(
488+
new CertSecretSourceBuilder()
489+
.withSecretName(KeycloakInstance.KEYCLOAK_SECRET_NAME)
490+
.withCertificate(KeycloakInstance.KEYCLOAK_SECRET_CERT)
491+
.build())
492+
.withDisableTlsHostnameVerification(true)
493+
.endKafkaListenerAuthenticationOAuth()
494+
.build())
495+
.endKafka()
496+
.endSpec()
497+
.build());
498+
499+
// Deploy MirrorMaker2 with OAuth
500+
KafkaMirrorMaker2ClusterSpec sourceClusterWithOauth = new KafkaMirrorMaker2ClusterSpecBuilder()
501+
.withAlias(kafkaSourceClusterName)
502+
.withConfig(connectorConfig)
503+
.withBootstrapServers(KafkaResources.tlsBootstrapAddress(oauthClusterName))
504+
// this is for kafka tls connection
505+
.withNewTls()
506+
.withTrustedCertificates(new CertSecretSourceBuilder()
507+
.withCertificate("ca.crt")
508+
.withSecretName(KafkaResources.clusterCaCertificateSecretName(oauthClusterName))
509+
.build())
510+
.endTls()
511+
.withNewKafkaClientAuthenticationOAuth()
512+
.withTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri())
513+
.withClientId("kafka-mirror-maker-2")
514+
.withNewClientSecret()
515+
.withSecretName(MIRROR_MAKER_2_OAUTH_SECRET)
516+
.withKey(OAUTH_KEY)
517+
.endClientSecret()
518+
.withConnectTimeoutSeconds(CONNECT_TIMEOUT_S)
519+
.withReadTimeoutSeconds(READ_TIMEOUT_S)
520+
// this is for authorization server tls connection
521+
.withTlsTrustedCertificates(new CertSecretSourceBuilder()
522+
.withSecretName(KeycloakInstance.KEYCLOAK_SECRET_NAME)
523+
.withCertificate(KeycloakInstance.KEYCLOAK_SECRET_CERT)
524+
.build())
525+
.withDisableTlsHostnameVerification(true)
526+
.endKafkaClientAuthenticationOAuth()
527+
.build();
528+
529+
KafkaMirrorMaker2ClusterSpec targetClusterWithOauth = new KafkaMirrorMaker2ClusterSpecBuilder()
530+
.withAlias(testStorage.getTargetClusterName())
531+
.withConfig(connectorConfig)
532+
.withBootstrapServers(KafkaResources.tlsBootstrapAddress(targetKafkaCluster))
533+
// this is for kafka tls connection
534+
.withNewTls()
535+
.withTrustedCertificates(new CertSecretSourceBuilder()
536+
.withCertificate("ca.crt")
537+
.withSecretName(KafkaResources.clusterCaCertificateSecretName(targetKafkaCluster))
538+
.build())
539+
.endTls()
540+
.withNewKafkaClientAuthenticationOAuth()
541+
.withTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri())
542+
.withClientId("kafka-mirror-maker-2")
543+
.withNewClientSecret()
544+
.withSecretName(MIRROR_MAKER_2_OAUTH_SECRET)
545+
.withKey(OAUTH_KEY)
546+
.endClientSecret()
547+
.withConnectTimeoutSeconds(CONNECT_TIMEOUT_S)
548+
.withReadTimeoutSeconds(READ_TIMEOUT_S)
549+
// this is for authorization server tls connection
550+
.withTlsTrustedCertificates(new CertSecretSourceBuilder()
551+
.withSecretName(KeycloakInstance.KEYCLOAK_SECRET_NAME)
552+
.withCertificate(KeycloakInstance.KEYCLOAK_SECRET_CERT)
553+
.build())
554+
.withDisableTlsHostnameVerification(true)
555+
.endKafkaClientAuthenticationOAuth()
556+
.build();
557+
558+
String kafkaTargetClusterTopicName = kafkaSourceClusterName + "." + testStorage.getTopicName();
559+
560+
resourceManager.createResourceWithWait(KafkaMirrorMaker2Templates.kafkaMirrorMaker2(oauthClusterName, testStorage.getTargetClusterName(), kafkaSourceClusterName, 1, false)
561+
.editMetadata()
562+
.withNamespace(Environment.TEST_SUITE_NAMESPACE)
563+
.endMetadata()
564+
.editSpec()
565+
.withClusters(sourceClusterWithOauth, targetClusterWithOauth)
566+
.editFirstMirror()
567+
.withSourceCluster(kafkaSourceClusterName)
568+
.endMirror()
569+
.endSpec()
570+
.build());
571+
572+
final String kafkaMirrorMaker2PodName = kubeClient().listPods(Environment.TEST_SUITE_NAMESPACE, oauthClusterName, Labels.STRIMZI_KIND_LABEL, KafkaMirrorMaker2.RESOURCE_KIND).get(0).getMetadata().getName();
573+
final String kafkaMirrorMaker2Logs = KubeClusterResource.cmdKubeClient(Environment.TEST_SUITE_NAMESPACE).execInCurrentNamespace(Level.DEBUG, "logs", kafkaMirrorMaker2PodName).out();
574+
verifyOauthConfiguration(kafkaMirrorMaker2Logs);
575+
576+
TestUtils.waitFor("MirrorMaker2 to copy messages from " + kafkaSourceClusterName + " to " + testStorage.getTargetClusterName(),
577+
Duration.ofSeconds(30).toMillis(), TestConstants.TIMEOUT_FOR_MIRROR_MAKER_COPY_MESSAGES_BETWEEN_BROKERS,
578+
() -> {
579+
LOGGER.info("Deleting Job: {}/{}", Environment.TEST_SUITE_NAMESPACE, consumerName);
580+
JobUtils.deleteJobWithWait(Environment.TEST_SUITE_NAMESPACE, consumerName);
581+
582+
LOGGER.info("Creating new client with new consumer-group and also to point on {} cluster", testStorage.getTargetClusterName());
583+
584+
KafkaOauthClients kafkaOauthClientJob = new KafkaOauthClientsBuilder()
585+
.withNamespaceName(Environment.TEST_SUITE_NAMESPACE)
586+
.withProducerName(producerName)
587+
.withConsumerName(consumerName)
588+
.withBootstrapAddress(KafkaResources.tlsBootstrapAddress(targetKafkaCluster))
589+
.withTopicName(kafkaTargetClusterTopicName)
590+
.withMessageCount(testStorage.getMessageCount())
591+
.withOauthClientId(OAUTH_CLIENT_NAME)
592+
.withOauthClientSecret(OAUTH_CLIENT_SECRET)
593+
.withOauthTokenEndpointUri(keycloakInstance.getOauthTokenEndpointUri())
594+
.build();
595+
596+
resourceManager.createResourceWithWait(kafkaOauthClientJob.consumerStrimziOauthTls(targetKafkaCluster));
597+
598+
try {
599+
ClientUtils.waitForClientSuccess(consumerName, Environment.TEST_SUITE_NAMESPACE, testStorage.getMessageCount());
600+
return true;
601+
} catch (WaitException e) {
602+
LOGGER.error("Failed while waiting for consumer to succeed", e);
603+
return false;
604+
}
605+
});
606+
}
607+
415608
@ParallelTest
416609
void testIntrospectionEndpoint() {
417610
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());

0 commit comments

Comments
 (0)