-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Strimzi Metrics Reporter integration with KafkaConnect and MirrorMaker2 #11570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
9a08f2a
d498fde
c1b7ad0
e234750
baa66bb
2d08c6e
6d8868a
727f768
f51bc2c
ab0afc3
e1f9b9e
93a12c7
270ef6d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,7 @@ | |
| import io.strimzi.kafka.oauth.server.plain.ServerPlainConfig; | ||
| import io.strimzi.operator.cluster.model.cruisecontrol.CruiseControlMetricsReporter; | ||
| import io.strimzi.operator.cluster.model.metrics.MetricsModel; | ||
| import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterConfig; | ||
| import io.strimzi.operator.cluster.model.metrics.StrimziMetricsReporterModel; | ||
| import io.strimzi.operator.common.InvalidConfigurationException; | ||
| import io.strimzi.operator.common.Reconciliation; | ||
|
|
@@ -45,7 +46,6 @@ | |
| import java.util.Comparator; | ||
| import java.util.HashMap; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.LinkedHashSet; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
|
|
@@ -66,8 +66,6 @@ public class KafkaBrokerConfigurationBuilder { | |
| private final static String PLACEHOLDER_CERT_STORE_PASSWORD_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:CERTS_STORE_PASSWORD}"; | ||
| private final static String PLACEHOLDER_OAUTH_CLIENT_SECRET_TEMPLATE_CONFIG_PROVIDER_ENV_VAR = "${strimzienv:STRIMZI_%s_OAUTH_CLIENT_SECRET}"; | ||
|
|
||
| private final static String KAFKA_JMX_REPORTER_CLASS = "org.apache.kafka.common.metrics.JmxReporter"; | ||
|
|
||
| private final StringWriter stringWriter = new StringWriter(); | ||
| private final PrintWriter writer = new PrintWriter(stringWriter); | ||
| private final Reconciliation reconciliation; | ||
|
|
@@ -145,16 +143,16 @@ public KafkaBrokerConfigurationBuilder withCruiseControl(String clusterName, Cru | |
| /** | ||
| * Configures the Strimzi Metrics Reporter. It is set only if user enables Strimzi Metrics Reporter. | ||
| * | ||
| * @param model Strimzi Metrics Reporter configuration | ||
| * @param model Strimzi Metrics Reporter configuration | ||
| * | ||
| * @return Returns the builder instance | ||
| */ | ||
| public KafkaBrokerConfigurationBuilder withStrimziMetricsReporter(MetricsModel model) { | ||
| public KafkaBrokerConfigurationBuilder withStrimziMetricsReporter(MetricsModel model) { | ||
| if (model instanceof StrimziMetricsReporterModel reporterModel) { | ||
| printSectionHeader("Strimzi Metrics Reporter configuration"); | ||
| writer.println("prometheus.metrics.reporter.listener.enable=true"); | ||
| writer.println("prometheus.metrics.reporter.listener=http://:" + StrimziMetricsReporterModel.METRICS_PORT); | ||
| writer.println("prometheus.metrics.reporter.allowlist=" + reporterModel.getAllowList()); | ||
| writer.println(StrimziMetricsReporterConfig.LISTENER_ENABLE + "=true"); | ||
|
katheris marked this conversation as resolved.
|
||
| writer.println(StrimziMetricsReporterConfig.LISTENER + "=http://:" + MetricsModel.METRICS_PORT); | ||
| writer.println(StrimziMetricsReporterConfig.ALLOW_LIST + "=" + reporterModel.getAllowList()); | ||
| writer.println(); | ||
| } | ||
| return this; | ||
|
|
@@ -829,7 +827,7 @@ private String getConfigProviderAliases(KafkaConfiguration userConfig) { | |
| /** | ||
| * Get the user provided Kafka configuration provider aliases, throwing an InvalidConfigurationException if any are found that would overwrite the Strimzi defined configuration providers | ||
| * | ||
| * @param strimziConfigProviders The Strimzi defined configuration providers | ||
| * @param strimziAliases The Strimzi defined configuration providers | ||
| * @param userConfig The user configuration to extract the possible user-provided config provider configuration from | ||
| * @return The user defined Kafka configuration provider aliases or empty string | ||
| */ | ||
|
|
@@ -854,10 +852,10 @@ private String getUserConfigProviderAliases(Collection<String> strimziAliases, K | |
| /** | ||
| * Adds the configurations passed by the user in the Kafka CR, injecting Strimzi configurations when needed. | ||
| * | ||
| * @param userConfig The User configuration - Kafka broker configuration options specified by the user in the Kafka custom resource | ||
| * @param injectCcMetricsReporter Inject the Cruise Control Metrics Reporter into the configuration | ||
| * @param userConfig The User configuration - Kafka broker configuration options specified by the user in the Kafka custom resource | ||
| * @param injectCcMetricsReporter Inject the Cruise Control Metrics Reporter into the configuration | ||
| * @param injectKafkaJmxReporter Flag to indicate if metrics are enabled. If they are we inject the JmxReporter into the configuration | ||
| * @param injectStrimziMetricsReporter Inject the Strimzi Metrics Reporter into the configuration | ||
| * @param injectKafkaJmxReporter Flag to indicate if metrics are enabled. If they are we inject the JmxReporter into the configuration | ||
| * | ||
| * @return Returns the builder instance | ||
| */ | ||
|
|
@@ -872,13 +870,9 @@ public KafkaBrokerConfigurationBuilder withUserConfiguration(KafkaConfiguration | |
|
|
||
| configProviders(userConfig); | ||
|
|
||
| // Adds the Kafka metric.reporters to the user configuration. | ||
| maybeAddMetricReporters(userConfig, injectCcMetricsReporter, injectKafkaJmxReporter, injectStrimziMetricsReporter); | ||
|
|
||
| // Adds the Yammer kafka.metrics.reporters to the user configuration. | ||
| maybeAddYammerMetricsReporters(userConfig, injectStrimziMetricsReporter); | ||
| printMetricReporters(userConfig, injectCcMetricsReporter, injectKafkaJmxReporter, injectStrimziMetricsReporter); | ||
| printYammerReporters(userConfig, injectStrimziMetricsReporter); | ||
|
|
||
| // print user config with Strimzi injections | ||
| if (!userConfig.getConfiguration().isEmpty()) { | ||
| printSectionHeader("User provided configuration"); | ||
| writer.println(userConfig.getConfiguration()); | ||
|
|
@@ -888,35 +882,70 @@ public KafkaBrokerConfigurationBuilder withUserConfiguration(KafkaConfiguration | |
| return this; | ||
| } | ||
|
|
||
| /** | ||
| * Adds the Kafka metric.reporters to the user configuration. | ||
| * | ||
| * @param userConfig The user configuration to which the metric reporters will be added. | ||
| * @param injectCcMetricsReporter Flag indicating whether to inject the Cruise Control Metrics Reporter. | ||
| * @param injectKafkaJmxReporter Inject the JMX Reporter into the configuration | ||
| * @param injectStrimziMetricsReporter Flag indicating whether to inject the Strimzi Metrics Reporter. | ||
| */ | ||
| private void maybeAddMetricReporters(KafkaConfiguration userConfig, boolean injectCcMetricsReporter, boolean injectKafkaJmxReporter, boolean injectStrimziMetricsReporter) { | ||
| if (injectCcMetricsReporter) { | ||
| createOrAddListConfig(userConfig, "metric.reporters", CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER); | ||
| } | ||
| if (injectKafkaJmxReporter) { | ||
| createOrAddListConfig(userConfig, "metric.reporters", "org.apache.kafka.common.metrics.JmxReporter"); | ||
| } | ||
| if (injectStrimziMetricsReporter) { | ||
| createOrAddListConfig(userConfig, "metric.reporters", "io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter"); | ||
| private void printMetricReporters(KafkaConfiguration userConfig, | ||
|
scholzj marked this conversation as resolved.
|
||
| boolean injectCcMetricsReporter, | ||
| boolean injectKafkaJmxReporter, | ||
| boolean injectStrimziMetricsReporter) { | ||
| // Build a list of reporters to inject based on flags | ||
| List<String> reportersToInject = new ArrayList<>(); | ||
| if (injectCcMetricsReporter) reportersToInject.add(CruiseControlMetricsReporter.CRUISE_CONTROL_METRIC_REPORTER); | ||
| // Since Kafka 4 the JmxReporter is explicitly added when JmxPrometheusExporter is enabled | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does this comment exactly mean related to the next code line?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JmxPrometheusExporter depends on JmxReporter, comment updated. |
||
| if (injectKafkaJmxReporter) reportersToInject.add("org.apache.kafka.common.metrics.JmxReporter"); | ||
| if (injectStrimziMetricsReporter) reportersToInject.add(StrimziMetricsReporterConfig.KAFKA_CLASS); | ||
|
|
||
| if (!reportersToInject.isEmpty()) { | ||
| String configKey = "metric.reporters"; | ||
|
|
||
| if (userConfig != null && !userConfig.getConfiguration().isEmpty() && | ||
| userConfig.getConfigOption(configKey) != null) { | ||
| // handle user configuration if present and avoids duplicates | ||
| String configValue = userConfig.getConfigOption(configKey); | ||
|
|
||
| reportersToInject = reportersToInject.stream() | ||
| .filter(r -> !userConfig.getConfigOption(configKey).contains(r)) | ||
| .toList(); | ||
|
|
||
| if (!reportersToInject.isEmpty()) { | ||
| userConfig.removeConfigOption(configKey); | ||
|
|
||
| printSectionHeader(configKey + " configuration"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't it better "Metrics Reporters configuration" here?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Metrics Reporters is a bit overloaded in this context, so using the exact configuration name is better IMO.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's going to be the only section header with a property name while the section title should be something to group stuff and more readable ... at this point we can even remove it if we are not able to find a good section title.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would you propose considering Broker and Connect configurations?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I proposed "Metrics Reporters configuration". Why do you think it's overloaded here?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following your logic this is what you will get in the same config: Strimzi Metrics Reporter configuration This is what I mean with overloaded. That said, if you prefer like this, it is fine with me.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in this end for the last two this is what
I am not saying I won't approve if you don't change. I don't want to start another long discussion with other maintainers. So it's up to you ... I am going to approve anyway. It's a minimal thing.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem here is in Kafka, where these configuration names are ambiguous. I just think that there is less space for confusion using verbatim configuration names. |
||
| writer.println("# " + configKey + " configured by the user and by Strimzi"); | ||
| writer.println(configKey + "=" + configValue + "," + String.join(",", reportersToInject)); | ||
| writer.println(); | ||
| } | ||
| } else { | ||
| printSectionHeader(configKey + " configuration"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto as above. |
||
| writer.println("# " + configKey + " configured by Strimzi"); | ||
| writer.println(configKey + "=" + String.join(",", reportersToInject)); | ||
| writer.println(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Adds the Yammer kafka.metrics.reporters to the user configuration if the Strimzi Metrics Reporter is enabled. | ||
| * | ||
| * @param userConfig The user configuration to which the Yammer metrics reporter will be added. | ||
| * @param injectStrimziMetricsReporter Flag indicating whether to inject the Strimzi Metrics Reporter. | ||
| */ | ||
| private void maybeAddYammerMetricsReporters(KafkaConfiguration userConfig, boolean injectStrimziMetricsReporter) { | ||
| private void printYammerReporters(KafkaConfiguration userConfig, | ||
|
scholzj marked this conversation as resolved.
|
||
| boolean injectStrimziMetricsReporter) { | ||
| if (injectStrimziMetricsReporter) { | ||
| createOrAddListConfig(userConfig, "kafka.metrics.reporters", "io.strimzi.kafka.metrics.YammerPrometheusMetricsReporter"); | ||
| String configKey = "kafka.metrics.reporters"; | ||
|
|
||
| if (userConfig != null | ||
| && !userConfig.getConfiguration().isEmpty() | ||
| && userConfig.getConfigOption(configKey) != null) { | ||
| // handle user configuration if present and avoids duplicates | ||
| String configValue = userConfig.getConfigOption(configKey); | ||
|
|
||
| if (!configValue.contains(StrimziMetricsReporterConfig.YAMMER_CLASS)) { | ||
| userConfig.removeConfigOption(configKey); | ||
| printSectionHeader(configKey + " configuration"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't it better "Kafka Metrics Reporters configuration" here? |
||
| writer.println("# " + configKey + " configured by the user and by Strimzi"); | ||
| writer.println(configKey + "=" + configValue + "," + StrimziMetricsReporterConfig.YAMMER_CLASS); | ||
| writer.println(); | ||
| } | ||
| } else { | ||
| printSectionHeader(configKey + " configuration"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto as above |
||
| writer.println("# " + configKey + " configured by Strimzi"); | ||
| writer.println(configKey + "=" + StrimziMetricsReporterConfig.YAMMER_CLASS); | ||
| writer.println(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -1061,45 +1090,6 @@ private void configureQuotasPluginStrimzi(String clusterName, QuotasPluginStrimz | |
| writer.println(String.format("client.quota.callback.static.excluded.principal.name.list=%s", String.join(";", excludedPrincipals))); | ||
| } | ||
|
|
||
| /** | ||
| * Append list configuration values or create a new list configuration if missing. | ||
| * A list configuration can contain a comma separated list of values. | ||
| * Duplicated values are removed. | ||
| * | ||
| * @param kafkaConfig Kafka configuration. | ||
| * @param key List configuration key. | ||
| * @param values List configuration values. | ||
| */ | ||
| static void createOrAddListConfig(AbstractConfiguration kafkaConfig, String key, String values) { | ||
| if (kafkaConfig == null) { | ||
| throw new IllegalArgumentException("Configuration is required"); | ||
| } | ||
| if (key == null || key.isBlank()) { | ||
| throw new IllegalArgumentException("Configuration key is required"); | ||
| } | ||
| if (values == null || values.isBlank()) { | ||
| throw new IllegalArgumentException("Configuration values are required"); | ||
| } | ||
|
|
||
| String existingConfig = kafkaConfig.getConfigOption(key); | ||
| // using an ordered set to preserve ordering of the existing kafkaConfig as values could potentially be user-provided. | ||
| Set<String> existingSet = existingConfig == null ? new LinkedHashSet<>() : | ||
| Arrays.stream(existingConfig.split(",")) | ||
| .map(String::trim) | ||
| .filter(s -> !s.isEmpty()) | ||
| .collect(Collectors.toCollection(LinkedHashSet::new)); | ||
| Set<String> newValues = Arrays.stream(values.split(",")) | ||
| .map(String::trim) | ||
| .filter(s -> !s.isEmpty()) | ||
| .collect(Collectors.toCollection(LinkedHashSet::new)); | ||
| // add only new values | ||
| boolean updated = existingSet.addAll(newValues); | ||
| if (updated) { | ||
| String updatedConfig = String.join(",", existingSet); | ||
| kafkaConfig.setConfigOption(key, updatedConfig); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Internal method which prints the section header into the configuration file. This makes it more human-readable | ||
| * when looking for issues in running pods etc. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.