diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlAuth.java b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlAuth.java index 275fb3f841..07653a5974 100644 --- a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlAuth.java +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlAuth.java @@ -25,7 +25,7 @@ public static boolean configureKafkaSQLOauth(ApicurioRegistry3 primary, Map resources = Serialization .unmarshal(AuthITTest.class.getResourceAsStream(keycloakResource)); @@ -34,11 +45,5 @@ protected static ApicurioRegistry3 prepareInfra(String keycloakResource, String createKeycloakDNSResolution("simple-keycloak.apps.cluster.example", "keycloak." + namespace + ".svc.cluster.local"); - // Deploy Registry - var registry = deserialize(apicurioResource, ApicurioRegistry3.class); - - registry.getMetadata().setNamespace(namespace); - - return registry; } } diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/ITBase.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/ITBase.java index b44e80853c..8e6235a310 100644 --- a/operator/controller/src/test/java/io/apicurio/registry/operator/it/ITBase.java +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/ITBase.java @@ -10,6 +10,8 @@ import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.networking.v1.NetworkPolicy; import io.fabric8.kubernetes.api.model.policy.v1.PodDisruptionBudget; +import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding; +import io.fabric8.kubernetes.api.model.rbac.RoleBinding; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; import io.fabric8.kubernetes.client.KubernetesClient; @@ -33,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.io.*; +import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; @@ -71,6 +74,8 @@ public enum OperatorDeployment { protected static String deploymentTarget; protected static String namespace; protected static boolean cleanup; + + protected static boolean strimziInstalled = false; private static Operator operator; @BeforeAll @@ -307,6 +312,28 @@ static void createKeycloakDNSResolution(String ingressHostname, String keycloakS client.apps().deployments().inNamespace(systemNamespace).withName("coredns").rolling().restart(); } + static void applyStrimziResources() throws IOException { + try (BufferedInputStream in = new BufferedInputStream( + new URL("https://strimzi.io/install/latest").openStream())) { + List resources = Serialization.unmarshal(in); + resources.forEach(r -> { + if (r.getKind().equals("ClusterRoleBinding") && r instanceof ClusterRoleBinding) { + var crb = (ClusterRoleBinding) r; + crb.getSubjects().forEach(s -> s.setNamespace(namespace)); + } else if (r.getKind().equals("RoleBinding") && r instanceof RoleBinding) { + var crb = (RoleBinding) r; + crb.getSubjects().forEach(s -> s.setNamespace(namespace)); + } + log.info("Creating Strimzi resource kind {} in namespace {}", r.getKind(), namespace); + client.resource(r).inNamespace(namespace).createOrReplace(); + await().atMost(Duration.ofMinutes(2)).ignoreExceptions().until(() -> { + assertThat(client.resource(r).inNamespace(namespace).get()).isNotNull(); + return true; + }); + }); + } + } + static void createNamespace(KubernetesClient client, String namespace) { log.info("Creating Namespace {}", namespace); client.resource( @@ -320,8 +347,8 @@ static String calculateNamespace() { } static void setDefaultAwaitilityTimings() { - Awaitility.setDefaultPollInterval(Duration.ofSeconds(5)); - Awaitility.setDefaultTimeout(Duration.ofSeconds(5 * 60)); + Awaitility.setDefaultPollInterval(Duration.ofSeconds(1)); + Awaitility.setDefaultTimeout(Duration.ofSeconds(30)); } static void createResources(List resources, String resourceType) { diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlITTest.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlITTest.java index a9eb359c78..bc68f3526d 100644 --- a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlITTest.java +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlITTest.java @@ -1,22 +1,13 @@ package io.apicurio.registry.operator.it; import io.apicurio.registry.operator.api.v1.ApicurioRegistry3; -import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.PodCondition; -import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding; -import io.fabric8.kubernetes.api.model.rbac.RoleBinding; -import io.fabric8.kubernetes.client.utils.Serialization; import io.quarkus.test.junit.QuarkusTest; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.net.URL; -import java.util.List; - import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -28,7 +19,9 @@ public class KafkaSqlITTest extends ITBase { @BeforeAll public static void beforeAll() throws Exception { - applyStrimziResources(); + if (!strimziInstalled) { + applyStrimziResources(); + } } @Test @@ -67,26 +60,4 @@ void testKafkaSQLPlain() { return true; }); } - - static void applyStrimziResources() throws IOException { - try (BufferedInputStream in = new BufferedInputStream( - new URL("https://strimzi.io/install/latest").openStream())) { - List resources = Serialization.unmarshal(in); - resources.forEach(r -> { - if (r.getKind().equals("ClusterRoleBinding") && r instanceof ClusterRoleBinding) { - var crb = (ClusterRoleBinding) r; - crb.getSubjects().forEach(s -> s.setNamespace(namespace)); - } else if (r.getKind().equals("RoleBinding") && r instanceof RoleBinding) { - var crb = (RoleBinding) r; - crb.getSubjects().forEach(s -> s.setNamespace(namespace)); - } - log.info("Creating Strimzi resource kind {} in namespace {}", r.getKind(), namespace); - client.resource(r).inNamespace(namespace).createOrReplace(); - await().ignoreExceptions().until(() -> { - assertThat(client.resource(r).inNamespace(namespace).get()).isNotNull(); - return true; - }); - }); - } - } } diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlOAuthITTest.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlOAuthITTest.java index 6751f2ab76..fbd96b7a3e 100644 --- a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlOAuthITTest.java +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlOAuthITTest.java @@ -8,38 +8,41 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.apicurio.registry.operator.it.KafkaSqlITTest.applyStrimziResources; import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @QuarkusTest -public class KafkaSqlOAuthITTest extends ITBase { +public class KafkaSqlOAuthITTest extends BaseAuthITTest { private static final Logger log = LoggerFactory.getLogger(KafkaSqlOAuthITTest.class); @BeforeAll public static void beforeAll() throws Exception { - applyStrimziResources(); + if (!strimziInstalled) { + applyStrimziResources(); + } } @Test void testKafkaSQLTLS() { + installKeycloak("/k8s/examples/auth/keycloak.yaml"); + client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml")) .create(); final var clusterName = "oauth-example-cluster"; await().ignoreExceptions().untilAsserted(() -> - // Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods - assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus() - .getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus) - .containsOnly("True")); + // Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods + assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus() + .getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus) + .containsOnly("True")); // We're guessing the value here to avoid using Strimzi Java model, and relying on retries below. var bootstrapServers = clusterName + "-kafka-bootstrap." + namespace + ".svc:9093"; var registry = deserialize( - "k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml.yaml", + "k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml", ApicurioRegistry3.class); registry.getMetadata().setNamespace(namespace); registry.getSpec().getApp().getStorage().getKafkasql().setBootstrapServers(bootstrapServers); diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlTLSITTest.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlTLSITTest.java index 6821825ffe..4fedaddb75 100644 --- a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlTLSITTest.java +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlTLSITTest.java @@ -8,7 +8,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.apicurio.registry.operator.it.KafkaSqlITTest.applyStrimziResources; import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -20,7 +19,9 @@ public class KafkaSqlTLSITTest extends ITBase { @BeforeAll public static void beforeAll() throws Exception { - applyStrimziResources(); + if (!strimziInstalled) { + applyStrimziResources(); + } } @Test @@ -30,10 +31,10 @@ void testKafkaSQLTLS() { final var clusterName = "example-cluster"; await().ignoreExceptions().untilAsserted(() -> - // Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods - assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus() - .getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus) - .containsOnly("True")); + // Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods + assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus() + .getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus) + .containsOnly("True")); client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/tls/apicurio.kafkauser.yaml")) .inNamespace(namespace).create(); diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml index f6ea1d8517..f09ed66096 100644 --- a/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml +++ b/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml @@ -1,3 +1,10 @@ +apiVersion: v1 +kind: Secret +metadata: + name: client-credentials +data: + client-secret: dGVzdDE= +--- # IMPORTANT: This resource should only be used for development or testing purposes. apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka @@ -16,9 +23,8 @@ spec: type: oauth clientId: admin-client clientSecret: - clientSecret: - key: client-secret - secretName: client-credentials + key: client-secret + secretName: client-credentials validIssuerUri: https://simple-keycloak.apps.cluster.example/realms/registry jwksEndpointUri: https://simple-keycloak.apps.cluster.example/realms/registry/protocol/openid-connect/certs userNameClaim: preferred_username diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml index bb472eaee1..07eeeecc82 100644 --- a/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml +++ b/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml @@ -1,10 +1,3 @@ -apiVersion: v1 -kind: Secret -metadata: - name: client-credentials -data: - client-secret: dGVzdDE= ---- apiVersion: registry.apicur.io/v1 kind: ApicurioRegistry3 metadata: @@ -17,10 +10,6 @@ spec: bootstrapServers: "..svc:9092" # Try using Strimzi/Red Hat AMQ Streams Operator! tls: - keystoreSecretRef: - name: apicurio - keystorePasswordSecretRef: - name: apicurio truststoreSecretRef: name: oauth-example-cluster-cluster-ca-cert truststorePasswordSecretRef: diff --git a/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlAuthSpec.java b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlAuthSpec.java index b96e41eaa4..b1e8b91fd9 100644 --- a/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlAuthSpec.java +++ b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlAuthSpec.java @@ -21,7 +21,7 @@ @JsonDeserialize(using = JsonDeserializer.None.class) @JsonInclude(NON_NULL) -@JsonPropertyOrder({ "enabled", "mechanism", "clientId", "clientSecret", "tokenEndpoint", +@JsonPropertyOrder({ "enabled", "mechanism", "clientId", "clientSecretRef", "tokenEndpoint", "loginHandlerClass" }) @NoArgsConstructor @AllArgsConstructor(access = PRIVATE) @@ -50,11 +50,11 @@ public class KafkaSqlAuthSpec { @JsonSetter(nulls = SKIP) private String clientId; - @JsonProperty("clientSecret") + @JsonProperty("clientSecretRef") @JsonPropertyDescription(""" The client secret used to authenticate to Kafka.""") @JsonSetter(nulls = SKIP) - private SecretKeyRef clientSecret; + private SecretKeyRef clientSecretRef; @JsonProperty("tokenEndpoint") @JsonPropertyDescription("""