Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
The `strimzi.io/node-pools` and `strimzi.io/kraft` annotations are not required anymore and will be ignored if set.
* Make properties `broker.session.timeout.ms`, `broker.heartbeat.interval.ms` and `controller.socket.timeout.ms` configurable
* Add monitoring of custom resources using [kubernetes-state-metrics (KSM)](https://github.com/kubernetes/kube-state-metrics) (see [Strimzi proposal 087](https://github.com/strimzi/proposals/blob/main/087-monitoring-of-custom-resources.md))
* Added support for Strimzi Metrics Reporter to the Kafka Bridge
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add Connect and MM2 here as wlel? Looks like that was missed in the last PR :-/.

You should also add the deprecation of .spec.enableMetrics (in KafkaBridge) to the section below.

Copy link
Copy Markdown
Member

@fvaleri fvaleri Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Added.


### Major changes, deprecations and removals

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.strimzi.api.kafka.model.common.ClientTls;
import io.strimzi.api.kafka.model.common.Constants;
import io.strimzi.api.kafka.model.common.HasConfigurableLogging;
import io.strimzi.api.kafka.model.common.HasConfigurableMetrics;
import io.strimzi.api.kafka.model.common.HasLivenessProbe;
import io.strimzi.api.kafka.model.common.HasReadinessProbe;
import io.strimzi.api.kafka.model.common.JvmOptions;
Expand All @@ -19,6 +20,7 @@
import io.strimzi.api.kafka.model.common.Rack;
import io.strimzi.api.kafka.model.common.Spec;
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
import io.strimzi.api.kafka.model.common.metrics.MetricsConfig;
import io.strimzi.api.kafka.model.common.tracing.Tracing;
import io.strimzi.crdgenerator.annotations.Description;
import io.strimzi.crdgenerator.annotations.DescriptionFile;
Expand All @@ -37,10 +39,10 @@
@JsonPropertyOrder({
"replicas", "image", "bootstrapServers", "tls", "authentication", "http", "adminClient", "consumer",
"producer", "resources", "jvmOptions", "logging", "clientRackInitImage", "rack",
"enableMetrics", "livenessProbe", "readinessProbe", "template", "tracing"})
"enableMetrics", "metricsConfig", "livenessProbe", "readinessProbe", "template", "tracing"})
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class KafkaBridgeSpec extends Spec implements HasConfigurableLogging, HasLivenessProbe, HasReadinessProbe {
public class KafkaBridgeSpec extends Spec implements HasConfigurableLogging, HasConfigurableMetrics, HasLivenessProbe, HasReadinessProbe {
private static final int DEFAULT_REPLICAS = 1;

private int replicas = DEFAULT_REPLICAS;
Expand All @@ -57,6 +59,7 @@ public class KafkaBridgeSpec extends Spec implements HasConfigurableLogging, Has
private JvmOptions jvmOptions;
private Logging logging;
private boolean enableMetrics;
private MetricsConfig metricsConfig;
private Probe livenessProbe;
private Probe readinessProbe;
private KafkaBridgeTemplate template;
Expand All @@ -77,15 +80,28 @@ public void setReplicas(int replicas) {
}

@JsonInclude(JsonInclude.Include.NON_NULL)
@Description("Enable the metrics for the Kafka Bridge. Default is false.")
@Description("Enable the metrics for the Kafka Bridge. Default is false. Deprecated, use `spec.metricsConfig`.")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you should add the @Deprecated, @DeprecatedProperty(description="...", movedToPath="...") and @PresentInVersions` annotations as well. IIRc that should also automatically generate the docs so you might want to revert the description change.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Thanks.

public boolean getEnableMetrics() {
return enableMetrics;
}

@Deprecated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We in general deprecated the getters only. Not the setters.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Reverted.

public void setEnableMetrics(boolean enableMetrics) {
this.enableMetrics = enableMetrics;
}

@Description("Metrics configuration.")
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Override
public MetricsConfig getMetricsConfig() {
return metricsConfig;
}

@Override
public void setMetricsConfig(MetricsConfig metricsConfig) {
this.metricsConfig = metricsConfig;
}

@Description("Logging configuration for Kafka Bridge.")
@JsonInclude(value = JsonInclude.Include.NON_NULL)
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import io.strimzi.api.kafka.model.common.JvmOptions;
import io.strimzi.api.kafka.model.common.Rack;
import io.strimzi.api.kafka.model.common.authentication.KafkaClientAuthentication;
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
import io.strimzi.api.kafka.model.common.metrics.StrimziMetricsReporter;
import io.strimzi.api.kafka.model.common.template.ContainerTemplate;
import io.strimzi.api.kafka.model.common.template.DeploymentStrategy;
import io.strimzi.api.kafka.model.common.template.DeploymentTemplate;
Expand All @@ -46,6 +48,10 @@
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.model.logging.LoggingModel;
import io.strimzi.operator.cluster.model.logging.SupportsLogging;
import io.strimzi.operator.cluster.model.metrics.JmxPrometheusExporterModel;
import io.strimzi.operator.cluster.model.metrics.MetricsModel;
import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel;
import io.strimzi.operator.cluster.model.metrics.SupportsMetrics;
import io.strimzi.operator.cluster.model.securityprofiles.ContainerSecurityProviderContextImpl;
import io.strimzi.operator.cluster.model.securityprofiles.PodSecurityProviderContextImpl;
import io.strimzi.operator.common.Reconciliation;
Expand All @@ -65,7 +71,17 @@
* Kafka Bridge model class
*/
@SuppressWarnings({"checkstyle:ClassFanOutComplexity"})
public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging {
public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging, SupportsMetrics {
/**
* Default Strimzi Metrics Reporter allow list.
* Check example dashboards compatibility in case of changes to existing regexes.
*/
private static final List<String> DEFAULT_METRICS_ALLOW_LIST = List.of(
"kafka_consumer_consumer_metrics.*",
"kafka_producer_kafka_metrics_count_count",
"kafka_producer_producer_metrics.*"
);

/**
* HTTP port configuration
*/
Expand All @@ -77,19 +93,17 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging
protected static final String PASSWORD_VOLUME_MOUNT = "/opt/strimzi/bridge-password/";
protected static final String ENV_VAR_KAFKA_INIT_INIT_FOLDER_KEY = "INIT_FOLDER";
private static final String KAFKA_BRIDGE_CONFIG_VOLUME_NAME = "kafka-bridge-configurations";
private static final String KAFKA_BRIDGE_CONFIG_VOLUME_MOUNT = "/opt/strimzi/custom-config/";

// Cluster Operator environment variables for custom discovery labels and annotations
protected static final String CO_ENV_VAR_CUSTOM_SERVICE_LABELS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_SERVICE_LABELS";
protected static final String CO_ENV_VAR_CUSTOM_SERVICE_ANNOTATIONS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_SERVICE_ANNOTATIONS";

// Kafka Bridge configuration keys (EnvVariables)
protected static final String ENV_VAR_PREFIX = "KAFKA_BRIDGE_";
protected static final String ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED = "KAFKA_BRIDGE_METRICS_ENABLED";
protected static final String ENV_VAR_KAFKA_BRIDGE_TRUSTED_CERTS = "KAFKA_BRIDGE_TRUSTED_CERTS";
protected static final String OAUTH_TLS_CERTS_BASE_VOLUME_MOUNT = "/opt/strimzi/oauth-certs/";
protected static final String OAUTH_SECRETS_BASE_VOLUME_MOUNT = "/opt/strimzi/oauth/";

protected static final String KAFKA_BRIDGE_CONFIG_VOLUME_MOUNT = "/opt/strimzi/custom-config/";
protected static final String CO_ENV_VAR_CUSTOM_BRIDGE_POD_LABELS = "STRIMZI_CUSTOM_KAFKA_BRIDGE_LABELS";
protected static final String INIT_VOLUME_MOUNT = "/opt/strimzi/init";

Expand All @@ -108,6 +122,7 @@ public class KafkaBridgeCluster extends AbstractModel implements SupportsLogging
private KafkaBridgeProducerSpec kafkaBridgeProducer;
private boolean isMetricsEnabled = false;
private LoggingModel logging;
private MetricsModel metrics;

// Templates
private PodDisruptionBudgetTemplate templatePodDisruptionBudget;
Expand Down Expand Up @@ -147,7 +162,7 @@ private KafkaBridgeCluster(Reconciliation reconciliation, HasMetadata resource,
* Create the KafkaBridge model instance from the KafkaBridge custom resource
*
* @param reconciliation Reconciliation marker
* @param kafkaBridge KafkaBridge custom resource
* @param kafkaBridge KafkaBridge custom resource
* @param sharedEnvironmentProvider Shared environment provider
* @return KafkaBridgeCluster instance
*/
Expand Down Expand Up @@ -182,7 +197,14 @@ public static KafkaBridgeCluster fromCrd(Reconciliation reconciliation,

result.readinessProbeOptions = ProbeUtils.extractReadinessProbeOptionsOrDefault(spec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS);
result.livenessProbeOptions = ProbeUtils.extractLivenessProbeOptionsOrDefault(spec, ProbeUtils.DEFAULT_HEALTHCHECK_OPTIONS);
result.isMetricsEnabled = spec.getEnableMetrics();

if (spec.getMetricsConfig() instanceof JmxPrometheusExporterMetrics) {
result.metrics = new JmxPrometheusExporterModel(spec);
} else if (spec.getMetricsConfig() instanceof StrimziMetricsReporter) {
result.metrics = new StrimziMetricsReporterModel(spec, DEFAULT_METRICS_ALLOW_LIST);
} else {
result.isMetricsEnabled = spec.getEnableMetrics();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we bypass this and set it to JMX Exporter with default configuration directly from the cluster operator? Not a strong opinion, just a thought 🤔.

Update: maybe that makes no sense based on the later code. But at least we should rename isMetricsEnabled to something more descriptive to what it does. E.g. isLegacyMetricsConfigurationEnabled or something.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Renamed.

}

result.setTls(spec.getTls() != null ? spec.getTls() : null);

Expand Down Expand Up @@ -220,7 +242,7 @@ private static void fromCrdTemplate(final KafkaBridgeCluster kafkaBridgeCluster,
}

/**
* @return Generates and returns the Kubernetes service for the Kafka Bridge
* @return Generates and returns the Kubernetes service for the Kafka Bridge
*/
public Service generateService() {
int port = DEFAULT_REST_API_PORT;
Expand All @@ -244,7 +266,7 @@ public Service generateService() {
/**
* Generates a JSON String with the discovery annotation for the bridge service
*
* @return JSON with discovery annotation
* @return JSON with discovery annotation
*/
/*test*/ Map<String, String> getDiscoveryAnnotation(int port) {
JsonObject discovery = new JsonObject();
Expand Down Expand Up @@ -312,7 +334,7 @@ protected List<VolumeMount> getVolumeMounts() {

return volumeMountList;
}

private List<VolumeMount> getInitContainerVolumeMounts() {
List<VolumeMount> volumeMountList = new ArrayList<>();
volumeMountList.add(VolumeUtils.createVolumeMount(INIT_VOLUME_NAME, INIT_VOLUME_MOUNT));
Expand All @@ -325,12 +347,11 @@ private List<VolumeMount> getInitContainerVolumeMounts() {
/**
* Generates the Bridge Kubernetes Deployment
*
* @param annotations Map with annotations
* @param isOpenShift Flag indicating if we are on OpenShift or not
* @param imagePullPolicy Image pull policy configuration
* @param imagePullSecrets List of image pull secrets
*
* @return Generated Kubernetes Deployment resource
* @param annotations Map with annotations
* @param isOpenShift Flag indicating if we are on OpenShift or not
* @param imagePullPolicy Image pull policy configuration
* @param imagePullSecrets List of image pull secrets
* @return Generated Kubernetes Deployment resource
*/
public Deployment generateDeployment(Map<String, String> annotations, boolean isOpenShift, ImagePullPolicy imagePullPolicy, List<LocalObjectReference> imagePullSecrets) {
return WorkloadUtils.createDeployment(
Expand Down Expand Up @@ -396,7 +417,6 @@ private Container createContainer(ImagePullPolicy imagePullPolicy) {

protected List<EnvVar> getEnvVars() {
List<EnvVar> varList = new ArrayList<>();
varList.add(ContainerUtils.createEnvVar(ENV_VAR_KAFKA_BRIDGE_METRICS_ENABLED, String.valueOf(isMetricsEnabled)));
varList.add(ContainerUtils.createEnvVar(ENV_VAR_STRIMZI_GC_LOG_ENABLED, String.valueOf(gcLoggingEnabled)));
JvmOptionUtils.javaOptions(varList, jvmOptions);

Expand All @@ -417,6 +437,7 @@ protected List<EnvVar> getEnvVars() {

/**
* Set the HTTP configuration
*
* @param kafkaBridgeHttpConfig HTTP configuration
*/
protected void setKafkaBridgeHttpConfig(KafkaBridgeHttpConfig kafkaBridgeHttpConfig) {
Expand Down Expand Up @@ -452,6 +473,7 @@ public PodDisruptionBudget generatePodDisruptionBudget() {

/**
* Set Kafka AdminClient's configuration
*
* @param kafkaBridgeAdminClient configuration
*/
protected void setKafkaAdminClientConfiguration(KafkaBridgeAdminClientSpec kafkaBridgeAdminClient) {
Expand All @@ -460,6 +482,7 @@ protected void setKafkaAdminClientConfiguration(KafkaBridgeAdminClientSpec kafka

/**
* Set Kafka consumer's configuration
*
* @param kafkaBridgeConsumer configuration
*/
protected void setKafkaConsumerConfiguration(KafkaBridgeConsumerSpec kafkaBridgeConsumer) {
Expand All @@ -468,6 +491,7 @@ protected void setKafkaConsumerConfiguration(KafkaBridgeConsumerSpec kafkaBridge

/**
* Set Kafka producer's configuration
*
* @param kafkaBridgeProducer configuration
*/
protected void setKafkaProducerConfiguration(KafkaBridgeProducerSpec kafkaBridgeProducer) {
Expand All @@ -476,14 +500,15 @@ protected void setKafkaProducerConfiguration(KafkaBridgeProducerSpec kafkaBridge

/**
* Set Bootstrap servers for connection to cluster
*
* @param bootstrapServers bootstrap servers
*/
protected void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

/**
* @return The HTTP configuration of the Bridge
* @return The HTTP configuration of the Bridge
*/
public KafkaBridgeHttpConfig getHttp() {
return this.http;
Expand Down Expand Up @@ -547,26 +572,37 @@ protected List<EnvVar> getInitContainerEnvVars() {
* It also generates the metrics and logging configuration. If this operand doesn't support logging
* or metrics, they will not be set.
*
* @param metricsAndLogging The external CMs with logging and metrics configuration
*
* @param metricsAndLogging The external CMs with logging and metrics configuration
* @return The generated ConfigMap
*/
public ConfigMap generateBridgeConfigMap(MetricsAndLogging metricsAndLogging) {
// generate the ConfigMap data entries for the metrics and logging configuration
Map<String, String> data = ConfigMapUtils.generateMetricsAndLogConfigMapData(reconciliation, this, metricsAndLogging);
// add the ConfigMap data entry for the bridge HTTP and Kafka clients related configuration
data.put(
BRIDGE_CONFIGURATION_FILENAME,
KafkaBridgeConfigurationBuilder builder =
new KafkaBridgeConfigurationBuilder(reconciliation, cluster, bootstrapServers)
.withTracing(tracing)
.withTls(tls)
.withAuthentication(authentication)
.withKafkaAdminClient(kafkaBridgeAdminClient)
.withKafkaProducer(kafkaBridgeProducer)
.withKafkaConsumer(kafkaBridgeConsumer)
.withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer)
.build()
);
.withHttp(http, kafkaBridgeProducer, kafkaBridgeConsumer);

// mapping the old deprecated enableMetrics to JMX Exporter type
String metricsType = (metrics instanceof JmxPrometheusExporterModel) || isMetricsEnabled
? JmxPrometheusExporterMetrics.TYPE_JMX_EXPORTER
: metrics instanceof StrimziMetricsReporterModel
? StrimziMetricsReporter.TYPE_STRIMZI_METRICS_REPORTER
: "disabled";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do really love the ternary operator in Java but ... just at one level. I think that the code is not that much readable when it comes to start inserting one inside the other.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Refactored.


if (metricsType.equals(JmxPrometheusExporterMetrics.TYPE_JMX_EXPORTER)) {
builder.withJmxPrometheusExporter((JmxPrometheusExporterModel) metrics, isMetricsEnabled);
} else if (metricsType.equals(StrimziMetricsReporter.TYPE_STRIMZI_METRICS_REPORTER)) {
builder.withStrimziMetricsReporter((StrimziMetricsReporterModel) metrics);
}

data.put(BRIDGE_CONFIGURATION_FILENAME, builder.build());

return ConfigMapUtils
.createConfigMap(
Expand All @@ -586,9 +622,16 @@ public int getReplicas() {
}

/**
* @return Logging Model instance for configuring logging
* @return Logging Model instance for configuring logging
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having less various unnecessary whitespace changes would make the review much easier. It is especially wierd if you remove tabs from the Javadocs in one place and use them in new Javadocs in other place.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Reverted.

*/
public LoggingModel logging() {
public LoggingModel logging() {
return logging;
}

/**
* @return Metrics Model instance for configuring Prometheus metrics
*/
public MetricsModel metrics() {
return metrics;
}
}
Loading
Loading