Skip to content

Commit b624ffa

Browse files
authored
Integrate OAuth 0.17.0 grantType configuration (#11757)
Signed-off-by: Marko Strukelj <marko.strukelj@gmail.com>
1 parent b95804a commit b624ffa

File tree

24 files changed

+87
-11
lines changed

24 files changed

+87
-11
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
* Extend the EntityOperator, Cruise Control and KafkaExporter deployment to support PDB via the template section in the CR spec.
1717
* Added support for [KIP-1073](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1073:+Return+fenced+brokers+in+DescribeCluster+response)
1818
to get the list of the registered brokers by using the Kafka Admin API. It replaces the usage of the `.status.registeredNodeIds` field in Kafka.
19+
* Update OAuth library to 0.17.0.
20+
* Additional OAuth configuration options have been added for 'oauth' authentication on the listener and the client.
21+
On the listener `clientGrantType` has been added.
22+
On the client `grantType` has been added.
1923

2024
### Major changes, deprecations and removals
2125

api/src/main/java/io/strimzi/api/kafka/model/common/authentication/KafkaClientAuthenticationOAuth.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,14 @@
3232
"readTimeoutSeconds", "httpRetries", "httpRetryPauseMs", "clientSecret", "passwordSecret", "accessToken",
3333
"refreshToken", "tlsTrustedCertificates", "disableTlsHostnameVerification", "maxTokenExpirySeconds",
3434
"accessTokenIsJwt", "enableMetrics", "includeAcceptHeader", "accessTokenLocation",
35-
"clientAssertion", "clientAssertionLocation", "clientAssertionType", "saslExtensions"})
35+
"clientAssertion", "clientAssertionLocation", "clientAssertionType", "saslExtensions", "grantType"})
3636
@EqualsAndHashCode(callSuper = true)
3737
@ToString(callSuper = true)
3838
public class KafkaClientAuthenticationOAuth extends KafkaClientAuthentication {
3939
public static final String TYPE_OAUTH = "oauth";
4040

4141
private String clientId;
42+
private String grantType;
4243
private String username;
4344
private String scope;
4445
private String audience;
@@ -80,6 +81,17 @@ public void setClientId(String clientId) {
8081
this.clientId = clientId;
8182
}
8283

84+
@Description("A custom OAuth grant type to use when authenticating against the authorization server with `clientId` and one of `clientSecret` or `clientAssertion`. "
85+
+ "The value defaults to `client_credentials` in these cases. This is optional configuration, only used with custom authorization server implementations.")
86+
@JsonInclude(JsonInclude.Include.NON_NULL)
87+
public String getGrantType() {
88+
return grantType;
89+
}
90+
91+
public void setGrantType(String grantType) {
92+
this.grantType = grantType;
93+
}
94+
8395
@Description("OAuth scope to use when authenticating against the authorization server. Some authorization servers require this to be set. "
8496
+ "The possible values depend on how authorization server is configured. By default `scope` is not specified when doing the token endpoint request.")
8597
@JsonInclude(JsonInclude.Include.NON_DEFAULT)

api/src/main/java/io/strimzi/api/kafka/model/kafka/listener/KafkaListenerAuthenticationOAuth.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
"accessTokenIsJwt", "tlsTrustedCertificates", "disableTlsHostnameVerification", "enableECDSA",
3636
"maxSecondsWithoutReauthentication", "enablePlain", "tokenEndpointUri", "enableOauthBearer", "customClaimCheck",
3737
"connectTimeoutSeconds", "readTimeoutSeconds", "httpRetries", "httpRetryPauseMs", "clientScope", "clientAudience",
38-
"enableMetrics", "failFast", "includeAcceptHeader", "serverBearerTokenLocation", "userNamePrefix"})
38+
"clientGrantType", "enableMetrics", "failFast", "includeAcceptHeader", "serverBearerTokenLocation", "userNamePrefix"})
3939
@EqualsAndHashCode(callSuper = true)
4040
@ToString(callSuper = true)
4141
public class KafkaListenerAuthenticationOAuth extends KafkaListenerAuthentication {
@@ -81,6 +81,7 @@ public class KafkaListenerAuthenticationOAuth extends KafkaListenerAuthenticatio
8181
private Integer httpRetryPauseMs;
8282
private String clientScope = null;
8383
private String clientAudience = null;
84+
private String clientGrantType = null;
8485
private boolean enableMetrics = false;
8586
private boolean failFast = true;
8687
private boolean includeAcceptHeader = true;
@@ -217,6 +218,18 @@ public void setClientAudience(String audience) {
217218
this.clientAudience = audience;
218219
}
219220

221+
@Description("The grant type to use when making requests to the authorization server's token endpoint. " +
222+
"Used for `OAuth over PLAIN` when `username` and `password` passed via SASL_PLAIN client authentication " +
223+
"are passed on to the authorization server as `clientId` and `secret`.")
224+
@JsonInclude(JsonInclude.Include.NON_NULL)
225+
public String getClientGrantType() {
226+
return clientGrantType;
227+
}
228+
229+
public void setClientGrantType(String grantType) {
230+
this.clientGrantType = grantType;
231+
}
232+
220233
@Description("URI of the JWKS certificate endpoint, which can be used for local JWT validation.")
221234
@JsonInclude(JsonInclude.Include.NON_NULL)
222235
public String getJwksEndpointUri() {

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/AuthenticationUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,7 @@ public static Map<String, String> oauthJaasOptions(KafkaClientAuthenticationOAut
374374
addOption(options, ClientConfig.OAUTH_CLIENT_ID, oauth.getClientId());
375375
addOption(options, ClientConfig.OAUTH_PASSWORD_GRANT_USERNAME, oauth.getUsername());
376376
addOption(options, ClientConfig.OAUTH_TOKEN_ENDPOINT_URI, oauth.getTokenEndpointUri());
377+
addOption(options, ClientConfig.OAUTH_CLIENT_CREDENTIALS_GRANT_TYPE, oauth.getGrantType());
377378
addOption(options, ClientConfig.OAUTH_SCOPE, oauth.getScope());
378379
addOption(options, ClientConfig.OAUTH_AUDIENCE, oauth.getAudience());
379380
if (oauth.isDisableTlsHostnameVerification()) {

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ private String getSecurityProtocol(boolean tls, boolean sasl) {
530530
addBooleanOptionIfFalse(options, ServerConfig.OAUTH_CHECK_ISSUER, oauth.isCheckIssuer());
531531
addBooleanOptionIfTrue(options, ServerConfig.OAUTH_CHECK_AUDIENCE, oauth.isCheckAudience());
532532
addOptionIfNotNull(options, ServerConfig.OAUTH_CUSTOM_CLAIM_CHECK, oauth.getCustomClaimCheck());
533+
addOptionIfNotNull(options, ServerConfig.OAUTH_CLIENT_CREDENTIALS_GRANT_TYPE, oauth.getClientGrantType());
533534
addOptionIfNotNull(options, ServerConfig.OAUTH_SCOPE, oauth.getClientScope());
534535
addOptionIfNotNull(options, ServerConfig.OAUTH_AUDIENCE, oauth.getClientAudience());
535536
addOptionIfNotNull(options, ServerConfig.OAUTH_JWKS_ENDPOINT_URI, oauth.getJwksEndpointUri());

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeClusterTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,6 +1062,7 @@ public void testGenerateDeploymentWithOAuthWithClientSecret() {
10621062
.withTokenEndpointUri("http://my-oauth-server")
10631063
.withAudience("kafka")
10641064
.withScope("all")
1065+
.withGrantType("custom_client_credentials")
10651066
.withNewClientSecret()
10661067
.withSecretName("my-secret-secret")
10671068
.withKey("my-secret-key")
@@ -1077,6 +1078,7 @@ public void testGenerateDeploymentWithOAuthWithClientSecret() {
10771078
assertThat(bridgeConfigurations, containsString("kafka.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
10781079
"oauth.client.id=\"my-client-id\" " +
10791080
"oauth.token.endpoint.uri=\"http://my-oauth-server\" " +
1081+
"oauth.client.credentials.grant.type=\"custom_client_credentials\" " +
10801082
"oauth.scope=\"all\" " +
10811083
"oauth.audience=\"kafka\" " +
10821084
"oauth.client.secret=\"${strimzidir:/opt/strimzi/oauth/my-secret-secret:my-secret-key}\";"));

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1977,6 +1977,7 @@ public void testOauthConfiguration() {
19771977
.withMaxSecondsWithoutReauthentication(3600)
19781978
.withJwksMinRefreshPauseSeconds(5)
19791979
.withEnablePlain(true)
1980+
.withClientGrantType("custom_client_credentials")
19801981
.withTokenEndpointUri("http://token")
19811982
.withConnectTimeoutSeconds(30)
19821983
.withReadTimeoutSeconds(30)
@@ -2012,9 +2013,9 @@ public void testOauthConfiguration() {
20122013
"ssl.endpoint.identification.algorithm=HTTPS",
20132014
"principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder",
20142015
"listener.name.plain-9092.oauthbearer.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.JaasServerOauthValidatorCallbackHandler",
2015-
"listener.name.plain-9092.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"thePrincipalName\" oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.server.bearer.token.location=\"/var/run/secrets/kubernetes.io/serviceaccount/token\" oauth.username.claim=\"preferred_username\" oauth.username.prefix=\"user-\" oauth.fallback.username.claim=\"client_id\" oauth.fallback.username.prefix=\"service-account-\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\";",
2016+
"listener.name.plain-9092.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"thePrincipalName\" oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.client.credentials.grant.type=\"custom_client_credentials\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.server.bearer.token.location=\"/var/run/secrets/kubernetes.io/serviceaccount/token\" oauth.username.claim=\"preferred_username\" oauth.username.prefix=\"user-\" oauth.fallback.username.claim=\"client_id\" oauth.fallback.username.prefix=\"service-account-\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\";",
20162017
"listener.name.plain-9092.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler",
2017-
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.server.bearer.token.location=\"/var/run/secrets/kubernetes.io/serviceaccount/token\" oauth.username.claim=\"preferred_username\" oauth.username.prefix=\"user-\" oauth.fallback.username.claim=\"client_id\" oauth.fallback.username.prefix=\"service-account-\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
2018+
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.client.credentials.grant.type=\"custom_client_credentials\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.server.bearer.token.location=\"/var/run/secrets/kubernetes.io/serviceaccount/token\" oauth.username.claim=\"preferred_username\" oauth.username.prefix=\"user-\" oauth.fallback.username.claim=\"client_id\" oauth.fallback.username.prefix=\"service-account-\" oauth.groups.claim=\"$.groups\" oauth.groups.claim.delimiter=\";\" oauth.connect.timeout.seconds=\"30\" oauth.read.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.include.accept.header=\"false\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
20182019
"listener.name.plain-9092.sasl.enabled.mechanisms=OAUTHBEARER,PLAIN",
20192020
"listener.name.plain-9092.connections.max.reauth.ms=3600000"));
20202021
}
@@ -2037,6 +2038,7 @@ public void testOauthConfigurationWithPlainOnly() {
20372038
.withTokenEndpointUri("http://token")
20382039
.withClientAudience("kafka")
20392040
.withClientScope("messaging")
2041+
.withClientGrantType("custom_client_credentials")
20402042
.withConnectTimeoutSeconds(30)
20412043
.withEnableMetrics(true)
20422044
.endKafkaListenerAuthenticationOAuth()
@@ -2069,7 +2071,7 @@ public void testOauthConfigurationWithPlainOnly() {
20692071
"ssl.endpoint.identification.algorithm=HTTPS",
20702072
"principal.builder.class=io.strimzi.kafka.oauth.server.OAuthKafkaPrincipalBuilder",
20712073
"listener.name.plain-9092.plain.sasl.server.callback.handler.class=io.strimzi.kafka.oauth.server.plain.JaasServerOauthOverPlainValidatorCallbackHandler",
2072-
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.scope=\"messaging\" oauth.audience=\"kafka\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.connect.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
2074+
"listener.name.plain-9092.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required oauth.valid.issuer.uri=\"http://valid-issuer\" oauth.client.credentials.grant.type=\"custom_client_credentials\" oauth.scope=\"messaging\" oauth.audience=\"kafka\" oauth.jwks.endpoint.uri=\"http://jwks\" oauth.jwks.refresh.min.pause.seconds=\"5\" oauth.username.claim=\"preferred_username\" oauth.connect.timeout.seconds=\"30\" oauth.enable.metrics=\"true\" oauth.config.id=\"PLAIN-9092\" oauth.token.endpoint.uri=\"http://token\";",
20732075
"listener.name.plain-9092.sasl.enabled.mechanisms=PLAIN",
20742076
"listener.name.plain-9092.connections.max.reauth.ms=3600000"));
20752077
}
@@ -2252,6 +2254,7 @@ public void testOAuthOptions() {
22522254
.withReadTimeoutSeconds(60)
22532255
.withHttpRetries(2)
22542256
.withHttpRetryPauseMs(500)
2257+
.withClientGrantType("custom_client_credentials")
22552258
.withClientAudience("kafka")
22562259
.withClientScope("messaging")
22572260
.withEnableMetrics(true)
@@ -2265,6 +2268,7 @@ public void testOAuthOptions() {
22652268
expectedOptions.put(ServerConfig.OAUTH_CHECK_ISSUER, String.valueOf(false));
22662269
expectedOptions.put(ServerConfig.OAUTH_CHECK_AUDIENCE, String.valueOf(true));
22672270
expectedOptions.put(ServerConfig.OAUTH_CUSTOM_CLAIM_CHECK, "@.aud && @.aud == 'something'");
2271+
expectedOptions.put(ServerConfig.OAUTH_CLIENT_CREDENTIALS_GRANT_TYPE, "custom_client_credentials");
22682272
expectedOptions.put(ServerConfig.OAUTH_SCOPE, "messaging");
22692273
expectedOptions.put(ServerConfig.OAUTH_AUDIENCE, "kafka");
22702274
expectedOptions.put(ServerConfig.OAUTH_JWKS_ENDPOINT_URI, "http://jwks-endpoint");

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConnectClusterTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1725,6 +1725,7 @@ public void testPodSetWithOAuthWithClientSecret() {
17251725
.withTokenEndpointUri("http://my-oauth-server")
17261726
.withAudience("kafka")
17271727
.withScope("all")
1728+
.withGrantType("custom-client-credentials")
17281729
.withNewClientSecret()
17291730
.withSecretName("my-secret-secret")
17301731
.withKey("my-secret-key")
@@ -1740,7 +1741,7 @@ public void testPodSetWithOAuthWithClientSecret() {
17401741
String connectConfigurations = configMap.getData().get(KafkaConnectCluster.KAFKA_CONNECT_CONFIGURATION_FILENAME);
17411742
assertThat(connectConfigurations, containsString("security.protocol=SASL_PLAINTEXT"));
17421743
assertThat(connectConfigurations, containsString("sasl.mechanism=OAUTHBEARER"));
1743-
assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.client.id=\"my-client-id\" oauth.token.endpoint.uri=\"http://my-oauth-server\" oauth.scope=\"all\" oauth.audience=\"kafka\" oauth.client.secret=\"${strimzidir:/opt/kafka/oauth/my-secret-secret:my-secret-key}\";"));
1744+
assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required oauth.client.id=\"my-client-id\" oauth.token.endpoint.uri=\"http://my-oauth-server\" oauth.client.credentials.grant.type=\"custom-client-credentials\" oauth.scope=\"all\" oauth.audience=\"kafka\" oauth.client.secret=\"${strimzidir:/opt/kafka/oauth/my-secret-secret:my-secret-key}\";"));
17441745
assertThat(connectConfigurations, containsString("sasl.login.callback.handler.class=io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler"));
17451746
}
17461747

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ClusterTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1816,6 +1816,8 @@ public void testPodSetWithOAuthWithClientSecret() {
18161816
new KafkaClientAuthenticationOAuthBuilder()
18171817
.withClientId("my-client-id")
18181818
.withTokenEndpointUri("http://my-oauth-server")
1819+
.withScope("all")
1820+
.withGrantType("custom_client_credentials")
18191821
.withNewClientSecret()
18201822
.withSecretName("my-secret-secret")
18211823
.withKey("my-secret-key")
@@ -1837,6 +1839,8 @@ public void testPodSetWithOAuthWithClientSecret() {
18371839
assertThat(connectConfigurations, containsString("sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required " +
18381840
"oauth.client.id=\"my-client-id\" " +
18391841
"oauth.token.endpoint.uri=\"http://my-oauth-server\" " +
1842+
"oauth.client.credentials.grant.type=\"custom_client_credentials\" " +
1843+
"oauth.scope=\"all\" " +
18401844
"oauth.client.secret=\"${strimzidir:/opt/kafka/oauth/my-secret-secret:my-secret-key}\";"));
18411845

18421846
// Check PodSet

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaMirrorMaker2ConnectorsTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,8 @@ public void testAddClusterToMirrorMaker2ConnectorConfigWithOauth() {
755755
.withKey("clientAssertionKey")
756756
.withSecretName("clientAssertionSecretName")
757757
.endClientAssertion()
758+
.withScope("all")
759+
.withGrantType("custom_client_credentials")
758760
.withTlsTrustedCertificates(new CertSecretSourceBuilder().withCertificate("ca.crt").withSecretName("my-oauth-secret").build())
759761
.endKafkaClientAuthenticationOAuth()
760762
.build();
@@ -770,6 +772,8 @@ public void testAddClusterToMirrorMaker2ConnectorConfigWithOauth() {
770772
"oauth.refresh.token", "${strimzidir:/opt/kafka/mm2-oauth/sourceClusterAlias/refreshTokenSecretName:refreshTokenKey}",
771773
"oauth.password.grant.password", "${strimzidir:/opt/kafka/mm2-oauth/sourceClusterAlias/passwordSecretSecretName:passwordSecretPassword}",
772774
"oauth.client.assertion", "${strimzidir:/opt/kafka/mm2-oauth/sourceClusterAlias/clientAssertionSecretName:clientAssertionKey}",
775+
"oauth.scope", "all",
776+
"oauth.client.credentials.grant.type", "custom_client_credentials",
773777
"oauth.ssl.truststore.location", "/tmp/kafka/clusters/sourceClusterAlias-oauth.truststore.p12",
774778
"oauth.ssl.truststore.type", "PKCS12",
775779
"oauth.ssl.truststore.password", PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR)));

0 commit comments

Comments
 (0)