Skip to content

Commit 4817202

Browse files
OwenCorrigan76fvaleri
authored andcommitted
Add support for Strimzi Metrics Reporter to the Strimzi Kafka Bridge (strimzi#11708)
Signed-off-by: OwenCorrigan76 <owencorrigan76@gmail.com> Signed-off-by: Federico Valeri <fedevaleri@gmail.com> Co-authored-by: Federico Valeri <fedevaleri@gmail.com>
1 parent db007cc commit 4817202

File tree

16 files changed

+3611
-44
lines changed

16 files changed

+3611
-44
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
The `strimzi.io/node-pools` and `strimzi.io/kraft` annotations are not required anymore and will be ignored if set.
77
* Make properties `broker.session.timeout.ms`, `broker.heartbeat.interval.ms` and `controller.socket.timeout.ms` configurable
88
* Add monitoring of custom resources using [kubernetes-state-metrics (KSM)](https://github.com/kubernetes/kube-state-metrics) (see [Strimzi proposal 087](https://github.com/strimzi/proposals/blob/main/087-monitoring-of-custom-resources.md))
9+
* Added support for Strimzi Metrics Reporter to Kafka Connect, Mirror Maker 2 and Kafka Bridge.
910

1011
### Major changes, deprecations and removals
1112

@@ -14,6 +15,7 @@
1415
* Disable Cruise Control network resource goals when resource capacities are not set.
1516
* The `strimzi_resource_state` metric in the Cluster Operator is deprecated and is planned to be removed in Strimzi 0.51.
1617
Use kube-state-metrics based metrics from the [examples](https://github.com/strimzi/strimzi-kafka-operator/tree/main/examples/metrics/kube-state-metrics) as a replacement.
18+
* The field `.spec.enableMetrics` in KafkaBridge is now deprecated and replaced by `.spec.metricsConfig`.
1719

1820
## 0.47.0
1921

api/src/main/java/io/strimzi/api/kafka/model/bridge/KafkaBridgeSpec.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
import com.fasterxml.jackson.annotation.JsonProperty;
99
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
1010
import io.fabric8.kubernetes.api.model.ResourceRequirements;
11+
import io.strimzi.api.annotations.DeprecatedProperty;
1112
import io.strimzi.api.kafka.model.common.ClientTls;
1213
import io.strimzi.api.kafka.model.common.Constants;
1314
import io.strimzi.api.kafka.model.common.HasConfigurableLogging;
15+
import io.strimzi.api.kafka.model.common.HasConfigurableMetrics;
1416
import io.strimzi.api.kafka.model.common.HasLivenessProbe;
1517
import io.strimzi.api.kafka.model.common.HasReadinessProbe;
1618
import io.strimzi.api.kafka.model.common.JvmOptions;
@@ -19,11 +21,13 @@
1921
import io.strimzi.api.kafka.model.common.Rack;
2022
import io.strimzi.api.kafka.model.common.Spec;
2123
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
24+
import io.strimzi.api.kafka.model.common.metrics.MetricsConfig;
2225
import io.strimzi.api.kafka.model.common.tracing.Tracing;
2326
import io.strimzi.crdgenerator.annotations.Description;
2427
import io.strimzi.crdgenerator.annotations.DescriptionFile;
2528
import io.strimzi.crdgenerator.annotations.KubeLink;
2629
import io.strimzi.crdgenerator.annotations.Minimum;
30+
import io.strimzi.crdgenerator.annotations.PresentInVersions;
2731
import io.sundr.builder.annotations.Buildable;
2832
import lombok.EqualsAndHashCode;
2933
import lombok.ToString;
@@ -37,10 +41,10 @@
3741
@JsonPropertyOrder({
3842
"replicas", "image", "bootstrapServers", "tls", "authentication", "http", "adminClient", "consumer",
3943
"producer", "resources", "jvmOptions", "logging", "clientRackInitImage", "rack",
40-
"enableMetrics", "livenessProbe", "readinessProbe", "template", "tracing"})
44+
"enableMetrics", "metricsConfig", "livenessProbe", "readinessProbe", "template", "tracing"})
4145
@EqualsAndHashCode(callSuper = true)
4246
@ToString(callSuper = true)
43-
public class KafkaBridgeSpec extends Spec implements HasConfigurableLogging, HasLivenessProbe, HasReadinessProbe {
47+
public class KafkaBridgeSpec extends Spec implements HasConfigurableLogging, HasConfigurableMetrics, HasLivenessProbe, HasReadinessProbe {
4448
private static final int DEFAULT_REPLICAS = 1;
4549

4650
private int replicas = DEFAULT_REPLICAS;
@@ -57,6 +61,7 @@ public class KafkaBridgeSpec extends Spec implements HasConfigurableLogging, Has
5761
private JvmOptions jvmOptions;
5862
private Logging logging;
5963
private boolean enableMetrics;
64+
private MetricsConfig metricsConfig;
6065
private Probe livenessProbe;
6166
private Probe readinessProbe;
6267
private KafkaBridgeTemplate template;
@@ -76,6 +81,10 @@ public void setReplicas(int replicas) {
7681
this.replicas = replicas;
7782
}
7883

84+
@Deprecated
85+
@DeprecatedProperty(movedToPath = ".spec.metricsConfig",
86+
description = "The `enableMetrics` configuration is deprecated and will be removed in the future.")
87+
@PresentInVersions("v1alpha1-v1beta2")
7988
@JsonInclude(JsonInclude.Include.NON_NULL)
8089
@Description("Enable the metrics for the Kafka Bridge. Default is false.")
8190
public boolean getEnableMetrics() {
@@ -86,6 +95,18 @@ public void setEnableMetrics(boolean enableMetrics) {
8695
this.enableMetrics = enableMetrics;
8796
}
8897

98+
@Description("Metrics configuration.")
99+
@JsonInclude(JsonInclude.Include.NON_EMPTY)
100+
@Override
101+
public MetricsConfig getMetricsConfig() {
102+
return metricsConfig;
103+
}
104+
105+
@Override
106+
public void setMetricsConfig(MetricsConfig metricsConfig) {
107+
this.metricsConfig = metricsConfig;
108+
}
109+
89110
@Description("Logging configuration for Kafka Bridge.")
90111
@JsonInclude(value = JsonInclude.Include.NON_NULL)
91112
@Override

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeCluster.java

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import io.strimzi.api.kafka.model.common.JvmOptions;
3636
import io.strimzi.api.kafka.model.common.Rack;
3737
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
38+
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
39+
import io.strimzi.api.kafka.model.common.metrics.StrimziMetricsReporter;
3840
import io.strimzi.api.kafka.model.common.template.ContainerTemplate;
3941
import io.strimzi.api.kafka.model.common.template.DeploymentStrategy;
4042
import io.strimzi.api.kafka.model.common.template.DeploymentTemplate;
@@ -46,6 +48,10 @@
4648
import io.strimzi.operator.cluster.ClusterOperatorConfig;
4749
import io.strimzi.operator.cluster.model.logging.LoggingModel;
4850
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
51+
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
52+
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
53+
import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel;
54+
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
4955
import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl;
5056
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
5157
import io.strimzi.operator.common.Reconciliation;
@@ -65,7 +71,17 @@
6571
* Kafka Bridge model class
6672
*/
6773
@SuppressWarnings({"checkstyle:ClassFanOutComplexity"})
68-
public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging {
74+
public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging, SupportsMetrics {
75+
/**
76+
* Default Strimzi Metrics Reporter allow list.
77+
* Check example dashboards compatibility in case of changes to existing regexes.
78+
*/
79+
private static final List<String> DEFAULT_METRICS_ALLOW_LIST = List.of(
80+
"kafka_consumer_consumer_metrics.*",
81+
"kafka_producer_kafka_metrics_count_count",
82+
"kafka_producer_producer_metrics.*"
83+
);
84+
6985
/**
7086
* HTTP port configuration
7187
*/
@@ -77,19 +93,17 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging
7793
protected static final String PASSWORD_VOLUME_MOUNT = "/opt/strimzi/bridge-password/";
7894
protected static final String ENV_VAR_KAFKA_INIT_INIT_FOLDER_KEY = "INIT_FOLDER";
7995
private static final String KAFKA_BRIDGE_CONFIG_VOLUME_NAME = "kafka-bridge-configurations";
80-
private static final String KAFKA_BRIDGE_CONFIG_VOLUME_MOUNT = "/opt/strimzi/custom-config/";
8196

8297
// Cluster Operator environment variables for custom discovery labels and annotations
8398
protected static final String CO_ENV_VAR_CUSTOM_SERVICE_LABELS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_SERVICE_LABELS";
8499
protected static final String CO_ENV_VAR_CUSTOM_SERVICE_ANNOTATIONS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_SERVICE_ANNOTATIONS";
85100

86101
// Kafka Bridge configuration keys (EnvVariables)
87102
protected static final String ENV_VAR_PREFIX = "KAFKA_BRIDGE_";
88-
protected static final String ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED";
89103
protected static final String ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS = "KAFKA_BRIDGE_TRUSTED_CERTS";
90104
protected static final String OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT = "/opt/strimzi/oauth-certs/";
91105
protected static final String OAUTH_SECRETS_BASE_VOLUME_MOUNT = "/opt/strimzi/oauth/";
92-
106+
protected static final String KAFKA_BRIDGE_CONFIG_VOLUME_MOUNT = "/opt/strimzi/custom-config/";
93107
protected static final String CO_ENV_VAR_CUSTOM_BRIDGE_POD_LABELS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_LABELS";
94108
protected static final String INIT_VOLUME_MOUNT = "/opt/strimzi/init";
95109

@@ -106,8 +120,9 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging
106120
private KafkaBridgeAdminClientSpec kafkaBridgeAdminClient;
107121
private KafkaBridgeConsumerSpec kafkaBridgeConsumer;
108122
private KafkaBridgeProducerSpec kafkaBridgeProducer;
109-
private boolean isMetricsEnabled = false;
123+
private boolean isLegacyMetricsConfigEnabled = false;
110124
private LoggingModel logging;
125+
private MetricsModel metrics;
111126

112127
// Templates
113128
private PodDisruptionBudgetTemplate templatePodDisruptionBudget;
@@ -151,7 +166,7 @@ private KafkaBridgeCluster(Reconciliation reconciliation, HasMetadata resource,
151166
* @param sharedEnvironmentProvider Shared environment provider
152167
* @return KafkaBridgeCluster instance
153168
*/
154-
@SuppressWarnings({"checkstyle:NPathComplexity"})
169+
@SuppressWarnings({"checkstyle:NPathComplexity", "deprecation"})
155170
public static KafkaBridgeCluster fromCrd(Reconciliation reconciliation,
156171
KafkaBridge kafkaBridge,
157172
SharedEnvironmentProvider sharedEnvironmentProvider) {
@@ -182,7 +197,14 @@ public static KafkaBridgeCluster fromCrd(Reconciliation reconciliation,
182197

183198
result.readinessProbeOptions = ProbeUtils.extractReadinessProbeOptionsOrDefault(spec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS);
184199
result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(spec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS);
185-
result.isMetricsEnabled = spec.getEnableMetrics();
200+
201+
if (spec.getMetricsConfig() instanceof JmxPrometheusExporterMetrics) {
202+
result.metrics = new JmxPrometheusExporterModel(spec);
203+
} else if (spec.getMetricsConfig() instanceof StrimziMetricsReporter) {
204+
result.metrics = new StrimziMetricsReporterModel(spec, DEFAULT_METRICS_ALLOW_LIST);
205+
} else {
206+
result.isLegacyMetricsConfigEnabled = spec.getEnableMetrics();
207+
}
186208

187209
result.setTls(spec.getTls() != null ? spec.getTls() : null);
188210

@@ -312,7 +334,7 @@ protected List<VolumeMount> getVolumeMounts() {
312334

313335
return volumeMountList;
314336
}
315-
337+
316338
private List<VolumeMount> getInitContainerVolumeMounts() {
317339
List<VolumeMount> volumeMountList = new ArrayList<>();
318340
volumeMountList.add(VolumeUtils.createVolumeMount(INIT_VOLUME_NAME, INIT_VOLUME_MOUNT));
@@ -396,7 +418,6 @@ private Container createContainer(ImagePullPolicy imagePullPolicy) {
396418

397419
protected List<EnvVar> getEnvVars() {
398420
List<EnvVar> varList = new ArrayList<>();
399-
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED, String.valueOf(isMetricsEnabled)));
400421
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));
401422
JvmOptionUtils.javaOptions(varList, jvmOptions);
402423

@@ -555,18 +576,23 @@ public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) {
555576
// generate the ConfigMap data entries for the metrics and logging configuration
556577
Map<String, String> data = ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging);
557578
// add the ConfigMap data entry for the bridge HTTP and Kafka clients related configuration
558-
data.put(
559-
BRIDGE_CONFIGURATION_FILENAME,
579+
KafkaBridgeConfigurationBuilder builder =
560580
new KafkaBridgeConfigurationBuilder(reconciliation, cluster, bootstrapServers)
561581
.withTracing(tracing)
562582
.withTls(tls)
563583
.withAuthentication(authentication)
564584
.withKafkaAdminClient(kafkaBridgeAdminClient)
565585
.withKafkaProducer(kafkaBridgeProducer)
566586
.withKafkaConsumer(kafkaBridgeConsumer)
567-
.withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer)
568-
.build()
569-
);
587+
.withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer);
588+
589+
if ((metrics instanceof JmxPrometheusExporterModel) || isLegacyMetricsConfigEnabled) {
590+
builder.withJmxPrometheusExporter((JmxPrometheusExporterModel) metrics, isLegacyMetricsConfigEnabled);
591+
} else if (metrics instanceof StrimziMetricsReporterModel) {
592+
builder.withStrimziMetricsReporter((StrimziMetricsReporterModel) metrics);
593+
}
594+
595+
data.put(BRIDGE_CONFIGURATION_FILENAME, builder.build());
570596

571597
return ConfigMapUtils
572598
.createConfigMap(
@@ -591,4 +617,11 @@ public int getReplicas() {
591617
public LoggingModel logging() {
592618
return logging;
593619
}
620+
621+
/**
622+
* @return Metrics Model instance for configuring Prometheus metrics
623+
*/
624+
public MetricsModel metrics() {
625+
return metrics;
626+
}
594627
}

cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBridgeConfigurationBuilder.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@
1818
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha256;
1919
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationScramSha512;
2020
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthenticationTls;
21+
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
2122
import io.strimzi.api.kafka.model.common.tracing.Tracing;
23+
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
24+
import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterConfig;
25+
import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel;
2226
import io.strimzi.operator.common.Reconciliation;
2327

2428
import java.io.PrintWriter;
2529
import java.io.StringWriter;
2630
import java.util.stream.Collectors;
2731

32+
import static io.strimzi.api.kafka.model.common.metrics.StrimziMetricsReporter.TYPE_STRIMZI_METRICS_REPORTER;
33+
import static io.strimzi.operator.cluster.model.KafkaBridgeCluster.KAFKA_BRIDGE_CONFIG_VOLUME_MOUNT;
2834
import static io.strimzi.operator.cluster.model.KafkaBridgeCluster.OAUTH_SECRETS_BASE_VOLUME_MOUNT;
2935

3036
/**
@@ -346,6 +352,53 @@ public KafkaBridgeConfigurationBuilder withHttp(KafkaBridgeHttpConfig http, Kafk
346352
return this;
347353
}
348354

355+
/**
356+
* Configures the Strimzi Metrics Reporter. It is set only if user enables Strimzi Metrics Reporter.
357+
*
358+
* @param model Strimzi Metrics Reporter configuration
359+
*
360+
* @return Returns the builder instance
361+
*/
362+
public KafkaBridgeConfigurationBuilder withStrimziMetricsReporter(StrimziMetricsReporterModel model) {
363+
if (model != null) {
364+
printSectionHeader("Strimzi Metrics Reporter configuration");
365+
writer.println("bridge.metrics=" + TYPE_STRIMZI_METRICS_REPORTER);
366+
// the kafka. prefix is required by the Bridge to pass Kafka client configurations
367+
writer.println("kafka.metric.reporters=" + StrimziMetricsReporterConfig.KAFKA_CLASS);
368+
writer.println("kafka." + StrimziMetricsReporterConfig.LISTENER_ENABLE + "=false");
369+
writer.println("kafka." + StrimziMetricsReporterConfig.ALLOW_LIST + "=" + model.getAllowList());
370+
writer.println();
371+
}
372+
return this;
373+
}
374+
375+
/**
376+
* Configures the JMX Prometheus Metrics Exporter.
377+
*
378+
* @param model JMX Prometheus Metrics Exporter configuration
379+
* @param isLegacyMetricsConfigEnabled Flag which indicates whether the metrics are enabled or not in legacy mode.
380+
*
381+
* @return Returns the builder instance
382+
*/
383+
public KafkaBridgeConfigurationBuilder withJmxPrometheusExporter(
384+
JmxPrometheusExporterModel model, boolean isLegacyMetricsConfigEnabled) {
385+
if (model != null || isLegacyMetricsConfigEnabled) {
386+
printSectionHeader("Prometheus JMX Exporter configuration");
387+
writer.println("bridge.metrics=" + JmxPrometheusExporterMetrics.TYPE_JMX_EXPORTER);
388+
389+
// if isLegacyMetricsConfigEnabled is not used, we pass the path of the config file.
390+
// If it is used, the Bridge will use the fallback config.
391+
if (!isLegacyMetricsConfigEnabled) {
392+
writer.println("bridge.metrics.exporter.config.path="
393+
+ KAFKA_BRIDGE_CONFIG_VOLUME_MOUNT + JmxPrometheusExporterModel.CONFIG_MAP_KEY);
394+
}
395+
396+
writer.println();
397+
}
398+
399+
return this;
400+
}
401+
349402
/**
350403
* Prints the file header which is on the beginning of the configuration file.
351404
*/

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaBridgeAssemblyOperator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ protected Future<KafkaBridgeStatus> createOrUpdate(Reconciliation reconciliation
103103
.compose(i -> bridgeInitClusterRoleBinding(reconciliation, initCrbName, initCrb))
104104
.compose(i -> deploymentOperations.scaleDown(reconciliation, namespace, bridge.getComponentName(), bridge.getReplicas(), operationTimeoutMs))
105105
.compose(scale -> serviceOperations.reconcile(reconciliation, namespace, KafkaBridgeResources.serviceName(bridge.getCluster()), bridge.generateService()))
106-
.compose(i -> MetricsAndLoggingUtils.metricsAndLogging(reconciliation, configMapOperations, bridge.logging(), null))
106+
.compose(i -> MetricsAndLoggingUtils.metricsAndLogging(reconciliation, configMapOperations, bridge.logging(), bridge.metrics()))
107107
.compose(metricsAndLogging -> {
108108
ConfigMap configMap = bridge.generateBridgeConfigMap(metricsAndLogging);
109109
podAnnotations.put(Annotations.ANNO_STRIMZI_IO_CONFIGURATION_HASH, Util.hashStub(configMap.getData().get(KafkaBridgeCluster.BRIDGE_CONFIGURATION_FILENAME)));

0 commit comments

Comments
 (0)