From 98eeca1abf4079c58c2c8886645ef300d6584665 Mon Sep 17 00:00:00 2001 From: Carles Arnal Date: Mon, 3 Feb 2025 07:37:57 +0100 Subject: [PATCH] Add kafka ouath --- .../operator/EnvironmentVariables.java | 8 +++ .../registry/operator/feat/KafkaSql.java | 6 +- .../registry/operator/feat/KafkaSqlAuth.java | 60 ++++++++++++++++ .../operator/it/KafkaSqlOAuthITTest.java | 61 ++++++++++++++++ .../kafkasql/oauth/oauth-example-cluster.yaml | 40 +++++++++++ ...xample-kafkasql-tls.apicurioregistry3.yaml | 41 +++++++++++ .../api/v1/spec/KafkaSqlAuthSpec.java | 70 +++++++++++++++++++ .../operator/api/v1/spec/KafkaSqlSpec.java | 23 +++++- 8 files changed, 306 insertions(+), 3 deletions(-) create mode 100644 operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlAuth.java create mode 100644 operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlOAuthITTest.java create mode 100644 operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml create mode 100644 operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml create mode 100644 operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlAuthSpec.java diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/EnvironmentVariables.java b/operator/controller/src/main/java/io/apicurio/registry/operator/EnvironmentVariables.java index 2496fbfe97..a9f2539dc5 100644 --- a/operator/controller/src/main/java/io/apicurio/registry/operator/EnvironmentVariables.java +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/EnvironmentVariables.java @@ -21,6 +21,14 @@ public class EnvironmentVariables { public static final String KAFKASQL_SSL_TRUSTSTORE_LOCATION = KAFKA_PREFIX + "SSL_TRUSTSTORE_LOCATION"; public static final String KAFKASQL_SSL_TRUSTSTORE_PASSWORD = KAFKA_PREFIX + "SSL_TRUSTSTORE_PASSWORD"; + //KafkaSQL oauth + public static final String APICURIO_KAFKASQL_SECURITY_SASL_ENABLED = "APICURIO_KAFKASQL_SECURITY_SASL_ENABLED"; + public static final String APICURIO_KAFKASQL_SECURITY_SASL_MECHANISM = "APICURIO_KAFKASQL_SECURITY_SASL_MECHANISM"; + public static final String APICURIO_KAFKA_SECURITY_SASL_CLIENT_ID = "APICURIO_KAFKA_SECURITY_SASL_CLIENT_ID"; + public static final String APICURIO_KAFKASQL_SECURITY_SASL_CLIENT_SECRET = "APICURIO_KAFKASQL_SECURITY_SASL_CLIENT_SECRET"; + public static final String APICURIO_KAFAKSQL_SECURITY_SASL_TOKEN_ENDPOINT = "APICURIO_KAFAKSQL_SECURITY_SASL_TOKEN_ENDPOINT"; + public static final String APICURIO_KAFAKSQL_SECURITY_SASL_LOGIN_CALLBACK_HANDLER_CLASS = "APICURIO_KAFAKSQL_SECURITY_SASL_LOGIN_CALLBACK_HANDLER_CLASS"; + // Auth related environment variables public static final String APICURIO_REGISTRY_AUTH_ENABLED = "QUARKUS_OIDC_TENANT_ENABLED"; public static final String APICURIO_REGISTRY_APP_CLIENT_ID = "QUARKUS_OIDC_CLIENT_ID"; diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSql.java b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSql.java index fd463f18af..3c7a50c0e6 100644 --- a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSql.java +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSql.java @@ -25,7 +25,7 @@ public class KafkaSql { public static String ENV_KAFKASQL_BOOTSTRAP_SERVERS = "APICURIO_KAFKASQL_BOOTSTRAP_SERVERS"; public static void configureKafkaSQL(ApicurioRegistry3 primary, Deployment deployment, - Map env) { + Map env) { ofNullable(primary.getSpec()).map(ApicurioRegistry3Spec::getApp).map(AppSpec::getStorage) .map(StorageSpec::getKafkasql).ifPresent(kafkasql -> { if (!isBlank(kafkasql.getBootstrapServers())) { @@ -38,6 +38,10 @@ public static void configureKafkaSQL(ApicurioRegistry3 primary, Deployment deplo env)) { log.info("KafkaSQL storage with TLS security configured."); } + + if (KafkaSqlAuth.configureKafkaSQLOauth(primary, env)) { + log.info("KafkaSQL storage with Oauth security configured."); + } } }); } diff --git a/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlAuth.java b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlAuth.java new file mode 100644 index 0000000000..616286e4a3 --- /dev/null +++ b/operator/controller/src/main/java/io/apicurio/registry/operator/feat/KafkaSqlAuth.java @@ -0,0 +1,60 @@ +package io.apicurio.registry.operator.feat; + +import io.apicurio.registry.operator.api.v1.ApicurioRegistry3; +import io.apicurio.registry.operator.api.v1.ApicurioRegistry3Spec; +import io.apicurio.registry.operator.api.v1.spec.AppSpec; +import io.apicurio.registry.operator.api.v1.spec.KafkaSqlAuthSpec; +import io.apicurio.registry.operator.api.v1.spec.KafkaSqlSpec; +import io.apicurio.registry.operator.api.v1.spec.StorageSpec; +import io.apicurio.registry.operator.utils.SecretKeyRefTool; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.apps.Deployment; + +import java.util.Map; +import java.util.Optional; + +import static io.apicurio.registry.operator.EnvironmentVariables.*; +import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar; +import static java.util.Optional.ofNullable; + +public class KafkaSqlAuth { + + /** + * KafkaSQL must be already configured. + */ + public static boolean configureKafkaSQLOauth(ApicurioRegistry3 primary, Map env) { + + // spotless:off + var clientSecret = new SecretKeyRefTool(getKafkaSqlAuthSpec(primary) + .map(KafkaSqlAuthSpec::getClientSecret) + .orElse(null), "client-secret"); + + if (clientSecret.isValid()) { + + getKafkaSqlAuthSpec(primary) + .filter(KafkaSqlAuthSpec::getEnabled) + .ifPresent(kafkaSqlAuthSpec -> { + addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_ENABLED, kafkaSqlAuthSpec.getEnabled().toString()); + addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_MECHANISM, kafkaSqlAuthSpec.getMechanism()); + addEnvVar(env, APICURIO_KAFKA_SECURITY_SASL_CLIENT_ID, kafkaSqlAuthSpec.getClientId()); + addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_CLIENT_SECRET, new SecretKeyRefTool(kafkaSqlAuthSpec.getClientSecret(), "client-secret").getSecretVolumeKeyPath()); + addEnvVar(env, APICURIO_KAFAKSQL_SECURITY_SASL_TOKEN_ENDPOINT, kafkaSqlAuthSpec.getTokenEndpoint()); + addEnvVar(env, APICURIO_KAFAKSQL_SECURITY_SASL_LOGIN_CALLBACK_HANDLER_CLASS, kafkaSqlAuthSpec.getLoginHandlerClass()); + }); + + return true; + } + return false; + } + + private static Optional getKafkaSqlAuthSpec(ApicurioRegistry3 primary) { + // spotless:off + return ofNullable(primary) + .map(ApicurioRegistry3::getSpec) + .map(ApicurioRegistry3Spec::getApp) + .map(AppSpec::getStorage) + .map(StorageSpec::getKafkasql) + .map(KafkaSqlSpec::getAuth); + // spotless:on + } +} diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlOAuthITTest.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlOAuthITTest.java new file mode 100644 index 0000000000..8b221e6ce5 --- /dev/null +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/KafkaSqlOAuthITTest.java @@ -0,0 +1,61 @@ +package io.apicurio.registry.operator.it; + +import io.apicurio.registry.operator.api.v1.ApicurioRegistry3; +import io.fabric8.kubernetes.api.model.PodCondition; +import io.quarkus.test.junit.QuarkusTest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static io.apicurio.registry.operator.it.KafkaSqlITTest.applyStrimziResources; +import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +@QuarkusTest +public class KafkaSqlOAuthITTest extends ITBase { + + private static final Logger log = LoggerFactory.getLogger(KafkaSqlOAuthITTest.class); + + @BeforeAll + public static void beforeAll() throws Exception { + applyStrimziResources(); + } + + @Test + void testKafkaSQLTLS() { + client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml")) + .create(); + final var clusterName = "oauth-example-cluster"; + + await().ignoreExceptions().untilAsserted(() -> + // Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods + assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus() + .getConditions()).filteredOn(c -> "Ready" .equals(c.getType())).map(PodCondition::getStatus) + .containsOnly("True")); + + // We're guessing the value here to avoid using Strimzi Java model, and relying on retries below. + var bootstrapServers = clusterName + "-kafka-bootstrap." + namespace + ".svc:9093"; + + var registry = deserialize("k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml.yaml", + ApicurioRegistry3.class); + registry.getMetadata().setNamespace(namespace); + registry.getSpec().getApp().getStorage().getKafkasql().setBootstrapServers(bootstrapServers); + + client.resource(registry).create(); + + await().ignoreExceptions().until(() -> { + assertThat(client.apps().deployments().inNamespace(namespace) + .withName(registry.getMetadata().getName() + "-app-deployment").get().getStatus() + .getReadyReplicas().intValue()).isEqualTo(1); + var podName = client.pods().inNamespace(namespace).list().getItems().stream() + .map(pod -> pod.getMetadata().getName()) + .filter(podN -> podN.startsWith(registry.getMetadata().getName() + "-app-deployment")) + .findFirst().get(); + assertThat(client.pods().inNamespace(namespace).withName(podName).getLog()) + .contains("Using Kafka-SQL artifactStore"); + return true; + }); + } +} diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml new file mode 100644 index 0000000000..7e924a6cdd --- /dev/null +++ b/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml @@ -0,0 +1,40 @@ +# IMPORTANT: This resource should only be used for development or testing purposes. +apiVersion: kafka.strimzi.io/v1beta2 +kind: Kafka +metadata: + name: oauth-example-cluster +spec: + kafka: + version: 3.8.0 + replicas: 1 + listeners: + - name: tls + type: internal + port: 9093 + tls: true + authentication: + type: tls + authorization: + type: oauth + clientId: admin-client + clientSecret: + clientSecret: + key: client-secret + secretName: client-credentials + validIssuerUri: https://simple-keycloak.apps.cluster.example/realms/registry + jwksEndpointUri: https://simple-keycloak.apps.cluster.example/realms/registry/protocol/openid-connect/certs + userNameClaim: preferred_username + tlsTrustedCertificates: + - secretName: keycloak-tls + certificate: tls.crt + config: + inter.broker.protocol.version: "3.8" + offsets.topic.replication.factor: 1 + storage: + type: ephemeral + zookeeper: + replicas: 1 + storage: + type: ephemeral + entityOperator: + userOperator: { } diff --git a/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml b/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml new file mode 100644 index 0000000000..bb472eaee1 --- /dev/null +++ b/operator/controller/src/test/resources/k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml @@ -0,0 +1,41 @@ +apiVersion: v1 +kind: Secret +metadata: + name: client-credentials +data: + client-secret: dGVzdDE= +--- +apiVersion: registry.apicur.io/v1 +kind: ApicurioRegistry3 +metadata: + name: example-kafkasql-tls +spec: + app: + storage: + type: kafkasql + kafkasql: + bootstrapServers: "..svc:9092" + # Try using Strimzi/Red Hat AMQ Streams Operator! + tls: + keystoreSecretRef: + name: apicurio + keystorePasswordSecretRef: + name: apicurio + truststoreSecretRef: + name: oauth-example-cluster-cluster-ca-cert + truststorePasswordSecretRef: + name: oauth-example-cluster-cluster-ca-cert + auth: + enabled: true + mechanism: "OAUTHBEARER" + clientId: "admin-client" + clientSecretRef: + name: client-credentials + key: client-secret + tokenEndpoint: https://simple-keycloak.apps.cluster.example/realms/registry/protocol/openid-connect/token + loginHandlerClass: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler + ingress: + host: example-kafkasql-tls-app.apps.cluster.example + ui: + ingress: + host: example-kafkasql-tls-ui.apps.cluster.example diff --git a/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlAuthSpec.java b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlAuthSpec.java new file mode 100644 index 0000000000..7f3d0b838c --- /dev/null +++ b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlAuthSpec.java @@ -0,0 +1,70 @@ +package io.apicurio.registry.operator.api.v1.spec; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.annotation.Nulls; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; +import static com.fasterxml.jackson.annotation.Nulls.SKIP; +import static lombok.AccessLevel.PRIVATE; + +@JsonDeserialize(using = JsonDeserializer.None.class) +@JsonInclude(NON_NULL) +@JsonPropertyOrder({ "enabled", "mechanism", "clientId", "clientSecret", "tokenEndpoint", "loginHandlerClass" }) +@NoArgsConstructor +@AllArgsConstructor(access = PRIVATE) +@SuperBuilder(toBuilder = true) +@Getter +@Setter +@EqualsAndHashCode +@ToString +public class KafkaSqlAuthSpec { + + @JsonProperty("enabled") + @JsonPropertyDescription(""" + Enables SASL OAuth authentication for Apicurio Registry storage in Kafka. You must set this variable to true for the other variables to have effect.""") + @JsonSetter(nulls = SKIP) + private Boolean enabled; + + @JsonProperty("mechanism") + @JsonPropertyDescription(""" + The mechanism used to authenticate to Kafka.""") + @JsonSetter(nulls = SKIP) + private String mechanism; + + @JsonProperty("clientId") + @JsonPropertyDescription(""" + The client ID used to authenticate to Kafka.""") + @JsonSetter(nulls = SKIP) + private String clientId; + + @JsonProperty("clientSecret") + @JsonPropertyDescription(""" + The client secret used to authenticate to Kafka.""") + @JsonSetter(nulls = SKIP) + private SecretKeyRef clientSecret; + + @JsonProperty("tokenEndpoint") + @JsonPropertyDescription(""" + The URL of the OAuth identity server.""") + @JsonSetter(nulls = SKIP) + private String tokenEndpoint; + + @JsonProperty("loginHandlerClass") + @JsonPropertyDescription(""" + The login class to be used for login.""") + @JsonSetter(nulls = SKIP) + private String loginHandlerClass; +} diff --git a/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlSpec.java b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlSpec.java index 34c2002c7e..56459769ac 100644 --- a/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlSpec.java +++ b/operator/model/src/main/java/io/apicurio/registry/operator/api/v1/spec/KafkaSqlSpec.java @@ -1,9 +1,19 @@ package io.apicurio.registry.operator.api.v1.spec; -import com.fasterxml.jackson.annotation.*; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.annotation.JsonSetter; +import com.fasterxml.jackson.annotation.Nulls; import com.fasterxml.jackson.databind.JsonDeserializer.None; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import lombok.*; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; import lombok.experimental.SuperBuilder; import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; @@ -43,4 +53,13 @@ public class KafkaSqlSpec { Configure KafkaSQL storage when the access to the Kafka cluster is secured using TLS.""") @JsonSetter(nulls = Nulls.SKIP) private KafkaSqlTLSSpec tls; + + /** + * Configure KafkaSQL storage when the access to the Kafka cluster is secured using TLS. + */ + @JsonProperty("auth") + @JsonPropertyDescription(""" + Configure KafkaSQL storage authentication.""") + @JsonSetter(nulls = Nulls.SKIP) + private KafkaSqlAuthSpec auth; }