Skip to content

Commit 04fc3db

Browse files
authored
Added support for Bridge internal configuration (#12598)
Signed-off-by: Paolo Patierno <ppatierno@live.com>
1 parent 9048130 commit 04fc3db

File tree

13 files changed

+256
-2
lines changed

13 files changed

+256
-2
lines changed

api/src/main/java/io/strimzi/api/kafka/model/bridge/KafkaBridgeSpec.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
import lombok.EqualsAndHashCode;
3434
import lombok.ToString;
3535

36+
import java.util.HashMap;
37+
import java.util.Map;
38+
3639
@DescriptionFile
3740
@Buildable(
3841
editableEnabled = false,
@@ -42,10 +45,13 @@
4245
@JsonPropertyOrder({
4346
"replicas", "image", "bootstrapServers", "tls", "authentication", "http", "adminClient", "consumer",
4447
"producer", "resources", "jvmOptions", "logging", "clientRackInitImage", "rack",
45-
"enableMetrics", "metricsConfig", "livenessProbe", "readinessProbe", "template", "tracing"})
48+
"enableMetrics", "metricsConfig", "livenessProbe", "readinessProbe", "template", "tracing", "config"})
4649
@EqualsAndHashCode(callSuper = true)
4750
@ToString(callSuper = true)
4851
public class KafkaBridgeSpec extends Spec implements HasConfigurableLogging, HasConfigurableMetrics, HasLivenessProbe, HasReadinessProbe {
52+
public static final String FORBIDDEN_PREFIXES = "kafka., http., bridge.metrics.";
53+
public static final String FORBIDDEN_OPTIONS = "bridge.id, bridge.tracing, bridge.metrics";
54+
4955
private static final int DEFAULT_REPLICAS = 1;
5056

5157
private int replicas = DEFAULT_REPLICAS;
@@ -69,6 +75,7 @@ public class KafkaBridgeSpec extends Spec implements HasConfigurableLogging, Has
6975
private Tracing tracing;
7076
private String clientRackInitImage;
7177
private Rack rack;
78+
private Map<String, Object> config = new HashMap<>(0);
7279

7380
@Description("The number of pods in the `Deployment`. " +
7481
"Required in the `v1` version of the Strimzi API. " +
@@ -287,4 +294,16 @@ public Rack getRack() {
287294
public void setRack(Rack rack) {
288295
this.rack = rack;
289296
}
297+
298+
@Description("Additional configuration for the HTTP bridge. " +
299+
"The following prefixes cannot be set: " + FORBIDDEN_PREFIXES + " " +
300+
"The following options cannot be set: " + FORBIDDEN_OPTIONS)
301+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
302+
public Map<String, Object> getConfig() {
303+
return config;
304+
}
305+
306+
public void setConfig(Map<String, Object> config) {
307+
this.config = config;
308+
}
290309
}

api/src/test/resources/crds/v1/046-Crd-kafkabridge.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,10 @@ spec:
14931493
required:
14941494
- type
14951495
description: The configuration of tracing in HTTP Bridge.
1496+
config:
1497+
x-kubernetes-preserve-unknown-fields: true
1498+
type: object
1499+
description: "Additional configuration for the HTTP bridge. The following prefixes cannot be set: kafka., http., bridge.metrics. The following options cannot be set: bridge.id, bridge.tracing, bridge.metrics."
14961500
required:
14971501
- replicas
14981502
- bootstrapServers

api/src/test/resources/crds/v1beta2/046-Crd-kafkabridge.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1345,6 +1345,9 @@ spec:
13451345
- opentelemetry
13461346
required:
13471347
- type
1348+
config:
1349+
x-kubernetes-preserve-unknown-fields: true
1350+
type: object
13481351
required:
13491352
- replicas
13501353
- bootstrapServers
@@ -2807,6 +2810,9 @@ spec:
28072810
- opentelemetry
28082811
required:
28092812
- type
2813+
config:
2814+
x-kubernetes-preserve-unknown-fields: true
2815+
type: object
28102816
required:
28112817
- bootstrapServers
28122818
status:

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging
122122
private KafkaBridgeProducerSpec kafkaBridgeProducer;
123123
private LoggingModel logging;
124124
private MetricsModel metrics;
125+
private KafkaBridgeConfiguration configuration;
125126

126127
// Templates
127128
private PodDisruptionBudgetTemplate templatePodDisruptionBudget;
@@ -198,6 +199,10 @@ public static KafkaBridgeCluster fromCrd(Reconciliation reconciliation,
198199
result.readinessProbeOptions = ProbeUtils.extractReadinessProbeOptionsOrDefault(spec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS);
199200
result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(spec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS);
200201

202+
if (spec.getConfig() != null) {
203+
result.configuration = new KafkaBridgeConfiguration(reconciliation, spec.getConfig().entrySet());
204+
}
205+
201206
if (spec.getMetricsConfig() instanceof JmxPrometheusExporterMetrics) {
202207
result.metrics = new JmxPrometheusExporterModel(spec);
203208
} else if (spec.getMetricsConfig() instanceof StrimziMetricsReporter) {
@@ -613,7 +618,8 @@ public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) {
613618
.withKafkaAdminClient(kafkaBridgeAdminClient)
614619
.withKafkaProducer(kafkaBridgeProducer)
615620
.withKafkaConsumer(kafkaBridgeConsumer)
616-
.withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer);
621+
.withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer)
622+
.withUserConfiguration(configuration);
617623

618624
if (metrics instanceof JmxPrometheusExporterModel jmxPrometheusExporterModel) {
619625
builder.withJmxPrometheusExporter(jmxPrometheusExporterModel);
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
6+
package io.strimzi.operator.cluster.model;
7+
8+
import io.strimzi.api.kafka.model.bridge.KafkaBridgeSpec;
9+
import io.strimzi.operator.common.Reconciliation;
10+
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
/**
15+
* Class for handling Kafka Bridge configuration passed by the user
16+
*/
17+
public class KafkaBridgeConfiguration extends AbstractConfiguration {
18+
19+
private static final List<String> FORBIDDEN_PREFIXES;
20+
private static final List<String> FORBIDDEN_OPTIONS;
21+
22+
static {
23+
FORBIDDEN_PREFIXES = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeSpec.FORBIDDEN_PREFIXES);
24+
FORBIDDEN_OPTIONS = AbstractConfiguration.splitPrefixesOrOptionsToList(KafkaBridgeSpec.FORBIDDEN_OPTIONS);
25+
}
26+
27+
/**
28+
* Constructor used to instantiate this class from JsonObject. Should be used to create configuration from
29+
* ConfigMap / CRD.
30+
*
31+
* @param reconciliation The reconciliation
32+
* @param jsonOptions Json object with configuration options as key ad value pairs.
33+
*/
34+
public KafkaBridgeConfiguration(Reconciliation reconciliation, Iterable<Map.Entry<String, Object>> jsonOptions) {
35+
super(reconciliation, jsonOptions, FORBIDDEN_PREFIXES, List.of(), FORBIDDEN_OPTIONS, Map.of());
36+
}
37+
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilder.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,22 @@ public KafkaBridgeConfigurationBuilder withJmxPrometheusExporter(JmxPrometheusEx
380380
return this;
381381
}
382382

383+
/**
384+
* Adds the configurations passed by the user in the KafkaBridge CR
385+
*
386+
* @param userConfig The Kafka bridge configuration options specified by the user in the KafkaBridge custom resource
387+
* @return Returns the builder instance
388+
*/
389+
public KafkaBridgeConfigurationBuilder withUserConfiguration(KafkaBridgeConfiguration userConfig) {
390+
if (userConfig != null && !userConfig.getConfiguration().isEmpty()) {
391+
printSectionHeader("User provided configuration");
392+
writer.println(userConfig.getConfiguration());
393+
writer.println();
394+
}
395+
396+
return this;
397+
}
398+
383399
/**
384400
* Prints the file header which is on the beginning of the configuration file.
385401
*/

cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilderTest.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -717,4 +717,104 @@ public void testWithPrometheusJmxExporter() {
717717
"kafka.security.protocol=PLAINTEXT"
718718
));
719719
}
720+
721+
@Test
722+
public void testUserConfigurationEmpty() {
723+
// test with null user configuration
724+
String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS)
725+
.withUserConfiguration(null)
726+
.build();
727+
assertThat(configuration, isEquivalent(
728+
"bridge.id=my-bridge",
729+
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
730+
"kafka.security.protocol=PLAINTEXT"
731+
));
732+
733+
// test with empty user configuration
734+
KafkaBridgeConfiguration userConfig = new KafkaBridgeConfiguration(Reconciliation.DUMMY_RECONCILIATION, Map.<String, Object>of().entrySet());
735+
configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS)
736+
.withUserConfiguration(userConfig)
737+
.build();
738+
assertThat(configuration, isEquivalent(
739+
"bridge.id=my-bridge",
740+
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
741+
"kafka.security.protocol=PLAINTEXT"
742+
));
743+
}
744+
745+
@Test
746+
public void testUserConfigurationValid() {
747+
// test with valid custom user configuration
748+
Map<String, Object> map = Map.of(
749+
"bridge.executor.pool.size", "20",
750+
"bridge.executor.queue.size", "1000"
751+
);
752+
KafkaBridgeConfiguration userConfig = new KafkaBridgeConfiguration(Reconciliation.DUMMY_RECONCILIATION, map.entrySet());
753+
754+
String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS)
755+
.withUserConfiguration(userConfig)
756+
.build();
757+
758+
assertThat(configuration, isEquivalent(
759+
"bridge.id=my-bridge",
760+
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
761+
"kafka.security.protocol=PLAINTEXT",
762+
"bridge.executor.pool.size=20",
763+
"bridge.executor.queue.size=1000"
764+
));
765+
}
766+
767+
@Test
768+
public void testUserConfigurationForbiddenPrefixes() {
769+
// test forbidden prefixes kafka.*, http.*, bridge.metrics.* are filtered out
770+
Map<String, Object> map = Map.of(
771+
"bridge.executor.pool.size", "20",
772+
"kafka.bootstrap.servers", "should-be-filtered",
773+
"http.port", "9999",
774+
"bridge.metrics.enabled", "true"
775+
);
776+
KafkaBridgeConfiguration userConfig = new KafkaBridgeConfiguration(Reconciliation.DUMMY_RECONCILIATION, map.entrySet());
777+
778+
String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS)
779+
.withUserConfiguration(userConfig)
780+
.build();
781+
782+
assertThat(configuration, isEquivalent(
783+
"bridge.id=my-bridge",
784+
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
785+
"kafka.security.protocol=PLAINTEXT",
786+
"bridge.executor.pool.size=20"
787+
));
788+
// verify forbidden properties are not in the configuration
789+
assertThat(configuration, not(containsString("kafka.bootstrap.servers=should-be-filtered")));
790+
assertThat(configuration, not(containsString("http.port=9999")));
791+
assertThat(configuration, not(containsString("bridge.metrics.enabled=true")));
792+
}
793+
794+
@Test
795+
public void testUserConfigurationForbiddenExactOptions() {
796+
// test forbidden exact options are filtered out but similar properties are allowed
797+
Map<String, Object> map = Map.of(
798+
"bridge.executor.pool.size", "20",
799+
"bridge.id", "should-be-filtered",
800+
"bridge.tracing", "should-be-filtered",
801+
"bridge.metrics", "should-be-filtered"
802+
);
803+
KafkaBridgeConfiguration userConfig = new KafkaBridgeConfiguration(Reconciliation.DUMMY_RECONCILIATION, map.entrySet());
804+
805+
String configuration = new KafkaBridgeConfigurationBuilder(Reconciliation.DUMMY_RECONCILIATION, BRIDGE_CLUSTER, BRIDGE_BOOTSTRAP_SERVERS)
806+
.withUserConfiguration(userConfig)
807+
.build();
808+
809+
assertThat(configuration, isEquivalent(
810+
"bridge.id=my-bridge",
811+
"kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092",
812+
"kafka.security.protocol=PLAINTEXT",
813+
"bridge.executor.pool.size=20"
814+
));
815+
// verify forbidden exact options are not in the configuration
816+
assertThat(configuration, not(containsString("bridge.id=should-be-filtered")));
817+
assertThat(configuration, not(containsString("bridge.tracing=should-be-filtered")));
818+
assertThat(configuration, not(containsString("bridge.metrics=should-be-filtered")));
819+
}
720820
}

documentation/assemblies/configuring/assembly-config.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,8 @@ include::../../modules/configuring/proc-config-http-bridge-user-authorization.ad
168168
include::../../modules/configuring/con-config-http-bridge-http-endpoints.adoc[leveloffset=+2]
169169
// Tuning consumer and producer settings
170170
include::../../modules/configuring/con-config-http-bridge-client-settings.adoc[leveloffset=+2]
171+
// Tuning internal bridge configuration
172+
include::../../modules/configuring/con-config-http-bridge-settings.adoc[leveloffset=+2]
171173
// Running multiple HTTP Bridge instances
172174
include::../../modules/configuring/con-config-http-bridge-multiple-instances.adoc[leveloffset=+2]
173175

documentation/modules/appendix_crds.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2988,6 +2988,9 @@ include::../api/io.strimzi.api.kafka.model.bridge.KafkaBridgeSpec.adoc[leveloffs
29882988
|tracing
29892989
|xref:type-OpenTelemetryTracing-{context}[`OpenTelemetryTracing`]
29902990
|The configuration of tracing in HTTP Bridge.
2991+
|config
2992+
|map
2993+
|Additional configuration for the HTTP bridge. The following prefixes cannot be set: kafka., http., bridge.metrics. The following options cannot be set: bridge.id, bridge.tracing, bridge.metrics.
29912994
|====
29922995

29932996
[id='type-KafkaBridgeHttpConfig-{context}']
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
:_mod-docs-content-type: CONCEPT
2+
3+
// Module included in the following assemblies:
4+
//
5+
// assembly-config.adoc
6+
7+
[id='con-config-http-bridge-settings-{context}']
8+
= Tuning internal HTTP Bridge settings
9+
10+
[role="_abstract"]
11+
Tune how the HTTP Bridge processes requests internally by configuring bridge-specific properties.
12+
13+
The HTTP Bridge uses an internal executor service to handle HTTP requests asynchronously.
14+
15+
You can adjust this behavior by setting properties in the `spec.config` section of the `KafkaBridge` resource.
16+
17+
Certain properties are managed directly by Strimzi and cannot be configured in `spec.config`.
18+
For example, properties with the `kafka.`, `http.`, and `bridge.metrics.` prefixes are configured using other properties in the `KafkaBridge` resource.
19+
Additionally, the specific `bridge.id`, `bridge.tracing`, and `bridge.metrics` options cannot be overridden.
20+
For more information, see the link:{BookURLConfiguring}#type-KafkaBridgeSpec-reference[`KafkaBridgeSpec` schema reference^].
21+
22+
== Configuring executor service settings
23+
24+
Configure executor service behavior using `spec.config`.
25+
26+
The executor service manages the thread pool that processes HTTP requests.
27+
28+
Configuration is defined as key-value pairs under `spec.config`.
29+
30+
.Example executor service configuration
31+
[source,yaml,subs="+quotes,attributes"]
32+
----
33+
spec:
34+
config:
35+
bridge.executor.pool.size: "20"
36+
bridge.executor.queue.size: "1000"
37+
----
38+
39+
* `spec.config` defines HTTP Bridge properties used to configure internal behavior.
40+
* `bridge.executor.pool.size` controls the maximum number of threads in the executor service pool.
41+
If not specified, the default is twice the number of available CPUs, with a minimum of 4 (`max(4, <available CPUs> * 2)`).
42+
* `bridge.executor.queue.size` controls the maximum number of requests that can be queued when all threads are busy.
43+
If not specified, the default is `1000`.
44+
45+
These are example properties.
46+
You can configure other HTTP Bridge properties as required.
47+
For the full list of supported options, see the link:{BookURLBridge}#_configuration_properties[HTTP Bridge configuration documentation^].
48+
49+
Choose values that match your expected load and performance requirements.
50+
51+
Executor service tuning is workload-dependent.
52+
Test changes in a non-production environment before applying them to production and monitor the impact.

0 commit comments

Comments
 (0)