Skip to content

Commit 2d31a2b

Browse files
authored
Allow per listener configurations (#12239)
Signed-off-by: Gantigmaa Selenge <tina.selenge@gmail.com>
1 parent 88f6e8f commit 2d31a2b

File tree

10 files changed

+101
-7
lines changed

10 files changed

+101
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## 0.51.0
44

5-
* Nothing here yet, but we will surely develop something new pretty soon 😉
5+
* Allow setting the following configurations with the listener prefix (e.g. `listener.name.listener1-9900.`): `connections.max.reauth.ms`, `max.connections*` and `max.connection.creation.rate`.
66

77
### Major changes, deprecations, and removals
88

api/src/main/java/io/strimzi/api/kafka/model/kafka/KafkaClusterSpec.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
@EqualsAndHashCode
5959
@ToString
6060
public class KafkaClusterSpec implements HasConfigurableMetrics, HasConfigurableLogging, HasJmxOptions, HasReadinessProbe, HasLivenessProbe, UnknownPropertyPreserving {
61+
/* When FORBIDDEN_PREFIXES is updated, documentation/api/io.strimzi.api.kafka.model.kafka.KafkaClusterSpec.adoc must be updated accordingly. */
6162
public static final String FORBIDDEN_PREFIXES = "listeners, advertised., broker., listener., host.name, port, "
6263
+ "inter.broker.listener.name, sasl., ssl., security., password., log.dir, "
6364
+ "zookeeper.connect, zookeeper.set.acl, zookeeper.ssl, zookeeper.clientCnxnSocket, authorizer., super.user, "
@@ -67,6 +68,7 @@ public class KafkaClusterSpec implements HasConfigurableMetrics, HasConfigurable
6768
+ "client.quota.callback.static.storage.per.volume.limit.min.available., client.quota.callback.static.excluded.principal.name.list, "
6869
+ "prometheus.metrics.reporter.";
6970

71+
/* When FORBIDDEN_PREFIX_EXCEPTIONS is updated, documentation/api/io.strimzi.api.kafka.model.kafka.KafkaClusterSpec.adoc must be updated accordingly. */
7072
public static final String FORBIDDEN_PREFIX_EXCEPTIONS = "zookeeper.connection.timeout.ms, sasl.server.max.receive.size, "
7173
+ "ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols, ssl.secure.random.implementation, "
7274
+ "cruise.control.metrics.topic.num.partitions, cruise.control.metrics.topic.replication.factor, cruise.control.metrics.topic.retention.ms, "
@@ -75,6 +77,9 @@ public class KafkaClusterSpec implements HasConfigurableMetrics, HasConfigurable
7577
+ "broker.session.timeout.ms, broker.heartbeat.interval.ms, controller.socket.timeout.ms, "
7678
+ "controller.quorum.election.backoff.max.ms, controller.quorum.election.timeout.ms, controller.quorum.fetch.timeout.ms"; // KRaft options
7779

80+
/* When ALLOWED_PER_LISTENER_CONFIGS is updated, documentation/api/io.strimzi.api.kafka.model.kafka.KafkaClusterSpec.adoc must be updated accordingly. */
81+
public static final String ALLOWED_PER_LISTENER_CONFIGS = "connections.max.reauth.ms, max.connections, max.connection.creation.rate";
82+
7883
private Storage storage;
7984
private String version;
8085
private String metadataVersion;
@@ -119,7 +124,7 @@ public void setMetadataVersion(String metadataVersion) {
119124
this.metadataVersion = metadataVersion;
120125
}
121126

122-
@Description("Kafka broker config properties with the following prefixes cannot be set: " + FORBIDDEN_PREFIXES + " (with the exception of: " + FORBIDDEN_PREFIX_EXCEPTIONS + ").")
127+
@Description("Kafka broker config properties with certain prefixes cannot be set unless it is in the exception list. Consult the documentation for the list of forbidden prefixes and exceptions")
123128
@JsonInclude(JsonInclude.Include.NON_EMPTY)
124129
public Map<String, Object> getConfig() {
125130
return config;

api/src/test/resources/crds/v1/040-Crd-kafka.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ spec:
379379
config:
380380
x-kubernetes-preserve-unknown-fields: true
381381
type: object
382-
description: "Kafka broker config properties with the following prefixes cannot be set: listeners, advertised., broker., listener., host.name, port, inter.broker.listener.name, sasl., ssl., security., password., log.dir, zookeeper.connect, zookeeper.set.acl, zookeeper.ssl, zookeeper.clientCnxnSocket, authorizer., super.user, cruise.control.metrics.topic, cruise.control.metrics.reporter.bootstrap.servers, node.id, process.roles, controller., metadata.log.dir, zookeeper.metadata.migration.enable, client.quota.callback.static.kafka.admin., client.quota.callback.static.produce, client.quota.callback.static.fetch, client.quota.callback.static.storage.per.volume.limit.min.available., client.quota.callback.static.excluded.principal.name.list, prometheus.metrics.reporter. (with the exception of: zookeeper.connection.timeout.ms, sasl.server.max.receive.size, ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols, ssl.secure.random.implementation, cruise.control.metrics.topic.num.partitions, cruise.control.metrics.topic.replication.factor, cruise.control.metrics.topic.retention.ms, cruise.control.metrics.topic.auto.create.retries, cruise.control.metrics.topic.auto.create.timeout.ms, cruise.control.metrics.topic.min.insync.replicas, broker.session.timeout.ms, broker.heartbeat.interval.ms, controller.socket.timeout.ms, controller.quorum.election.backoff.max.ms, controller.quorum.election.timeout.ms, controller.quorum.fetch.timeout.ms)."
382+
description: Kafka broker config properties with certain prefixes cannot be set unless it is in the exception list. Consult the documentation for the list of forbidden prefixes and exceptions.
383383
authorization:
384384
type: object
385385
properties:

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,9 +345,10 @@ public static KafkaCluster fromCrd(Reconciliation reconciliation,
345345
result.ownerReference,
346346
kafkaClusterSpec
347347
);
348+
List<GenericKafkaListener> listeners = kafkaClusterSpec.getListeners();
348349

349350
// Handle Kafka broker configuration
350-
KafkaConfiguration configuration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet());
351+
KafkaConfiguration configuration = new KafkaConfiguration(reconciliation, kafkaClusterSpec.getConfig().entrySet(), listeners);
351352
validateConfiguration(reconciliation, kafka, result.kafkaVersion, configuration);
352353

353354
if (kafkaClusterSpec.getQuotas() != null) {
@@ -364,7 +365,6 @@ public static KafkaCluster fromCrd(Reconciliation reconciliation,
364365
LOGGER.errorCr(reconciliation, "The required field .spec.kafka.listeners is missing");
365366
throw new InvalidResourceException("The required field .spec.kafka.listeners is missing");
366367
}
367-
List<GenericKafkaListener> listeners = kafkaClusterSpec.getListeners();
368368
ListenersValidator.validate(reconciliation, result.brokerNodes(), listeners);
369369
result.listeners = listeners;
370370

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaConfiguration.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import io.strimzi.api.kafka.model.kafka.KafkaClusterSpec;
9+
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
910
import io.strimzi.kafka.config.model.ConfigModel;
1011
import io.strimzi.kafka.config.model.ConfigModels;
1112
import io.strimzi.operator.common.Reconciliation;
@@ -30,13 +31,16 @@ public class KafkaConfiguration extends AbstractConfiguration {
3031
*/
3132
public static final String DEFAULT_REPLICATION_FACTOR = "default.replication.factor";
3233

34+
private static final String PER_LISTENER_CONFIG_FORMAT = "listener.name.%s.%s";
3335
private static final List<String> FORBIDDEN_PREFIXES;
3436
private static final List<String> FORBIDDEN_PREFIX_EXCEPTIONS;
37+
private static final List<String> ALLOWED_PER_LISTENER_CONFIGURATIONS;
3538
private static final Map<String, String> DEFAULTS;
3639

3740
static {
3841
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIXES);
3942
FORBIDDEN_PREFIX_EXCEPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.FORBIDDEN_PREFIX_EXCEPTIONS);
43+
ALLOWED_PER_LISTENER_CONFIGURATIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaClusterSpec.ALLOWED_PER_LISTENER_CONFIGS);
4044

4145
DEFAULTS = new HashMap<>(1);
4246
// when users remove "min.insync.replicas" from the Kafka custom resource, the operator is going to force the
@@ -65,11 +69,31 @@ public KafkaConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<Stri
6569
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, FORBIDDEN_PREFIX_EXCEPTIONS, List.of(), DEFAULTS);
6670
}
6771

72+
/**
73+
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
74+
* ConfigMap / CRD. It also modifies the FORBIDDEN_PREFIX_EXCEPTIONS with the given listener names.
75+
*
76+
* @param reconciliation The reconciliation
77+
* @param jsonOptions Json object with configuration options as key ad value pairs.
78+
* @param listeners Listener names to add to the exception list
79+
*/
80+
public KafkaConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions, List<GenericKafkaListener> listeners) {
81+
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, modifyWithListeners(listeners), List.of(), DEFAULTS);
82+
}
83+
6884
private KafkaConfiguration(Reconciliation reconciliation, String configuration, List<String> forbiddenPrefixes) {
6985
super(reconciliation, configuration, forbiddenPrefixes, List.of(), List.of(), DEFAULTS);
7086
}
7187

7288

89+
/* test */ static List<String> modifyWithListeners(List<GenericKafkaListener> listeners) {
90+
List<String> exceptions = new ArrayList<>();
91+
listeners.stream().map(ListenersUtils::identifier).forEach(l -> ALLOWED_PER_LISTENER_CONFIGURATIONS.forEach(c -> exceptions.add(String.format(PER_LISTENER_CONFIG_FORMAT, l, c))));
92+
exceptions.addAll(FORBIDDEN_PREFIX_EXCEPTIONS);
93+
return exceptions;
94+
}
95+
96+
7397
/**
7498
* Returns a KafkaConfiguration created without forbidden option filtering.
7599
*

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaRebalanceAssemblyOperator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,6 @@ private void handleUserTaskStatusResponse(Reconciliation reconciliation, CruiseC
918918
if (taskStatusJson.has(CruiseControlRebalanceKeys.LOAD_BEFORE_OPTIMIZATION.getKey()) &&
919919
taskStatusJson.has(CruiseControlRebalanceKeys.LOAD_AFTER_OPTIMIZATION.getKey())) {
920920
LOGGER.infoCr(reconciliation, "Rebalance ({}) optimization proposal is now ready", sessionId);
921-
System.out.println("Status" + taskStatusJson);
922921
p.complete(buildRebalanceStatus(kafkaRebalance, sessionId, KafkaRebalanceState.ProposalReady, taskStatusJson, conditions));
923922
break;
924923
}

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1891,6 +1891,45 @@ public void testReplicasAndRelatedOptionsValidationOk() {
18911891
});
18921892
}
18931893

1894+
@Test
1895+
public void testPerListenerConfigurations() {
1896+
Kafka kafkaAssembly = new KafkaBuilder(KAFKA)
1897+
.editSpec()
1898+
.editKafka()
1899+
.withConfig(Map.of("listener.name.tls-9093.max.connections", 1000,
1900+
"listener.name.tls-9093.connections.max.reauth.ms", 1000,
1901+
"listener.name.tls-9093.connections.max.idle.ms", 100000,
1902+
"listener.name.plain-9092.max.connections", 1000,
1903+
"listener.name.plain-9092.connections.max.reauth.ms", 1000,
1904+
"listener.name.plain-9092.connections.max.idle.ms", 100000,
1905+
"listener.name.CONTROLPLANE-9090.max.connections", 1000,
1906+
"listener.name.REPLICATION-9091.connections.max.reauth.ms", 1000,
1907+
"listener.name.does-not-exist.connections.max.reauth.ms", 1000))
1908+
.endKafka()
1909+
.endSpec()
1910+
.build();
1911+
1912+
List<KafkaPool> pools = NodePoolUtils.createKafkaPools(Reconciliation.DUMMY_RECONCILIATION, kafkaAssembly, List.of(POOL_CONTROLLERS, POOL_MIXED, POOL_BROKERS), Map.of(), KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, SHARED_ENV_PROVIDER);
1913+
KafkaCluster cluster = KafkaCluster.fromCrd(Reconciliation.DUMMY_RECONCILIATION, kafkaAssembly, pools, VERSIONS, KafkaVersionTestUtils.DEFAULT_KRAFT_VERSION_CHANGE, null, SHARED_ENV_PROVIDER);
1914+
1915+
// configurations that are in the exception list
1916+
assertThat(cluster.configuration.getConfiguration(), CoreMatchers.containsString("listener.name.tls-9093.connections.max.reauth.ms=1000"));
1917+
assertThat(cluster.configuration.getConfiguration(), CoreMatchers.containsString("listener.name.tls-9093.max.connections=1000"));
1918+
assertThat(cluster.configuration.getConfiguration(), CoreMatchers.containsString("listener.name.plain-9092.connections.max.reauth.ms=1000"));
1919+
assertThat(cluster.configuration.getConfiguration(), CoreMatchers.containsString("listener.name.plain-9092.max.connections=1000"));
1920+
1921+
// configuration that is not in the exception
1922+
assertThat(cluster.configuration.getConfiguration(), not(CoreMatchers.containsString("listener.name.plain-9092.connections.max.idle.ms")));
1923+
assertThat(cluster.configuration.getConfiguration(), not(CoreMatchers.containsString("listener.name.tls-9093.connections.max.idle.ms")));
1924+
1925+
// configuration for a listener that does not exist
1926+
assertThat(cluster.configuration.getConfiguration(), not(CoreMatchers.containsString("listener.name.does-not-exist.connections.max.reauth.ms")));
1927+
1928+
// configurations for internal interfaces
1929+
assertThat(cluster.configuration.getConfiguration(), not(CoreMatchers.containsString("listener.name.REPLICATION-9091.connections.max.reauth.ms")));
1930+
assertThat(cluster.configuration.getConfiguration(), not(CoreMatchers.containsString("listener.name.CONTROLPLANE-9090.max.connections")));
1931+
}
1932+
18941933
@Test
18951934
public void testCruiseControl() {
18961935
Kafka kafkaAssembly = new KafkaBuilder(KAFKA)

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaConfigurationTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@
44
*/
55
package io.strimzi.operator.cluster.model;
66

7+
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListener;
8+
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder;
9+
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
710
import io.strimzi.operator.cluster.KafkaVersionTestUtils;
811
import io.strimzi.operator.common.Reconciliation;
912
import org.junit.jupiter.api.Assertions;
1013
import org.junit.jupiter.api.Test;
1114

15+
import java.util.List;
16+
1217
import static java.util.Collections.emptyList;
1318
import static java.util.Collections.singletonList;
1419
import static java.util.Collections.singletonMap;
20+
import static org.hamcrest.CoreMatchers.hasItems;
1521
import static org.hamcrest.CoreMatchers.is;
1622
import static org.hamcrest.CoreMatchers.nullValue;
1723
import static org.hamcrest.MatcherAssert.assertThat;
@@ -140,4 +146,19 @@ public void testRemoteStorageExpirationThreadPoolSize() {
140146
assertConfigError("remote.log.manager.expiration.thread.pool.size", "0", "remote.log.manager.expiration.thread.pool.size has value 0 which less than the minimum value 1");
141147
assertConfigError("remote.log.manager.expiration.thread.pool.size", "-5", "remote.log.manager.expiration.thread.pool.size has value -5 which less than the minimum value 1");
142148
}
149+
150+
@Test
151+
public void testModifyWithListeners() {
152+
GenericKafkaListener listener1 = new GenericKafkaListenerBuilder()
153+
.withName("listener1")
154+
.withPort(9900)
155+
.withType(KafkaListenerType.INTERNAL)
156+
.build();
157+
158+
List<String> modifiedExceptionList = KafkaConfiguration.modifyWithListeners(List.of(listener1));
159+
160+
assertThat(modifiedExceptionList, hasItems("listener.name.listener1-9900.connections.max.reauth.ms",
161+
"listener.name.listener1-9900.max.connections",
162+
"listener.name.listener1-9900.max.connection.creation.rate"));
163+
}
143164
}

documentation/api/io.strimzi.api.kafka.model.kafka.KafkaClusterSpec.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ All other supported options are forwarded to Kafka, including the following exce
8484
** `controller.quorum.election.backoff.max.ms`
8585
** `controller.quorum.election.timeout.ms`
8686
** `controller.quorum.fetch.timeout.ms`
87+
* Listener properties (which are configured in the following format: `listener.name.listener1-9900.connections.max.reauth.ms` where `listener1-9900` is the listener name and port joined by `-`):
88+
** `connections.max.reauth.ms`
89+
** `max.connections`
90+
** `max.connections.per.ip`
91+
** `max.connections.per.ip.overrides`
92+
** `max.connection.creation.rate`
8793
8894
[id='property-kafka-brokerRackInitImage-{context}']
8995
= Configuring rack awareness and init container images

documentation/modules/appendix_crds.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ include::../api/io.strimzi.api.kafka.model.kafka.KafkaClusterSpec.adoc[leveloffs
8989
|Configures listeners to provide access to Kafka brokers.
9090
|config
9191
|map
92-
|Kafka broker config properties with the following prefixes cannot be set: listeners, advertised., broker., listener., host.name, port, inter.broker.listener.name, sasl., ssl., security., password., log.dir, zookeeper.connect, zookeeper.set.acl, zookeeper.ssl, zookeeper.clientCnxnSocket, authorizer., super.user, cruise.control.metrics.topic, cruise.control.metrics.reporter.bootstrap.servers, node.id, process.roles, controller., metadata.log.dir, zookeeper.metadata.migration.enable, client.quota.callback.static.kafka.admin., client.quota.callback.static.produce, client.quota.callback.static.fetch, client.quota.callback.static.storage.per.volume.limit.min.available., client.quota.callback.static.excluded.principal.name.list, prometheus.metrics.reporter. (with the exception of: zookeeper.connection.timeout.ms, sasl.server.max.receive.size, ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols, ssl.secure.random.implementation, cruise.control.metrics.topic.num.partitions, cruise.control.metrics.topic.replication.factor, cruise.control.metrics.topic.retention.ms, cruise.control.metrics.topic.auto.create.retries, cruise.control.metrics.topic.auto.create.timeout.ms, cruise.control.metrics.topic.min.insync.replicas, broker.session.timeout.ms, broker.heartbeat.interval.ms, controller.socket.timeout.ms, controller.quorum.election.backoff.max.ms, controller.quorum.election.timeout.ms, controller.quorum.fetch.timeout.ms).
92+
|Kafka broker config properties with certain prefixes cannot be set unless it is in the exception list. Consult the documentation for the list of forbidden prefixes and exceptions.
9393
|storage
9494
|xref:type-EphemeralStorage-{context}[`EphemeralStorage`], xref:type-PersistentClaimStorage-{context}[`PersistentClaimStorage`], xref:type-JbodStorage-{context}[`JbodStorage`]
9595
|**The `storage` property has been deprecated.** Use `KafkaNodePool` resources. Storage is now configured in the `KafkaNodePool` resources and this option is ignored.

0 commit comments

Comments
 (0)