diff --git a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java index bb0c68af30..4deafe38bc 100644 --- a/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java @@ -16,6 +16,7 @@ import kafka.log.stream.s3.telemetry.otel.OTelHistogramReporter; import kafka.server.KafkaConfig; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.server.ProcessRole; import org.apache.kafka.server.metrics.KafkaYammerMetrics; import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager; @@ -143,6 +144,25 @@ protected MetricsExporterURI buildMetricsExporterURI(String clusterId, KafkaConf } protected void initializeMetricsManager(Meter meter) { + S3StreamKafkaMetricsManager.setTruststoreCertsSupplier(() -> { + try { + Password truststoreCertsPassword = kafkaConfig.getPassword("ssl.truststore.certificates"); + return truststoreCertsPassword != null ? truststoreCertsPassword.value() : null; + } catch (Exception e) { + LOGGER.error("Failed to get truststore certs", e); + return null; + } + }); + + S3StreamKafkaMetricsManager.setCertChainSupplier(() -> { + try { + Password certChainPassword = kafkaConfig.getPassword("ssl.keystore.certificate.chain"); + return certChainPassword != null ? certChainPassword.value() : null; + } catch (Exception e) { + LOGGER.error("Failed to get cert chain", e); + return null; + } + }); S3StreamMetricsManager.configure(new MetricsConfig(metricsLevel(), Attributes.empty(), kafkaConfig.s3ExporterReportIntervalMs())); S3StreamMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX); diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java index e769c58361..9034dcfac6 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java @@ -20,8 +20,17 @@ import com.automq.stream.s3.metrics.wrapper.HistogramInstrument; import com.automq.stream.s3.metrics.wrapper.HistogramMetric; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.security.cert.CertificateException; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; import java.util.ArrayList; +import java.util.Base64; import java.util.Collections; +import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; @@ -35,6 +44,8 @@ public class S3StreamKafkaMetricsManager { + private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamKafkaMetricsManager.class); + private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>(); public static final List FETCH_LIMITER_TIME_METRICS = new CopyOnWriteArrayList<>(); @@ -52,6 +63,9 @@ public class S3StreamKafkaMetricsManager { private static final MultiAttributes BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamKafkaMetricsConstants.LABEL_BACK_PRESSURE_STATE); + // List to store all the observable long gauges for certificates + private static final List CERT_OBSERVABLE_LONG_GAUGES = new ArrayList<>(); + static { BASE_ATTRIBUTES_LISTENERS.add(BROKER_ATTRIBUTES); BASE_ATTRIBUTES_LISTENERS.add(S3_OBJECT_ATTRIBUTES); @@ -100,6 +114,15 @@ public class S3StreamKafkaMetricsManager { */ private static Supplier> backPressureStateSupplier = Collections::emptyMap; + /** + * supplier for truststoreCerts + */ + private static Supplier truststoreCertsSupplier = () -> null; + /** + * supplier for server cert chain + */ + private static Supplier certChainSupplier = () -> null; + public static void configure(MetricsConfig metricsConfig) { synchronized (BASE_ATTRIBUTES_LISTENERS) { S3StreamKafkaMetricsManager.metricsConfig = metricsConfig; @@ -116,6 +139,11 @@ public static void initMetrics(Meter meter, String prefix) { initLogAppendMetrics(meter, prefix); initPartitionStatusStatisticsMetrics(meter, prefix); initBackPressureMetrics(meter, prefix); + try { + initCertMetrics(meter, prefix); + } catch (Exception e) { + LOGGER.error("Failed to init cert metrics", e); + } } private static void initAutoBalancerMetrics(Meter meter, String prefix) { @@ -276,16 +304,109 @@ private static void initPartitionStatusStatisticsMetrics(Meter meter, String pre private static void initBackPressureMetrics(Meter meter, String prefix) { backPressureState = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.BACK_PRESSURE_STATE_METRIC_NAME) - .setDescription("Back pressure state") - .ofLongs() - .buildWithCallback(result -> { - if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { - Map states = backPressureStateSupplier.get(); - states.forEach((state, value) -> { - result.record(value, BACK_PRESSURE_STATE_ATTRIBUTES.get(state)); - }); - } - }); + .setDescription("Back pressure state") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + Map states = backPressureStateSupplier.get(); + states.forEach((state, value) -> { + result.record(value, BACK_PRESSURE_STATE_ATTRIBUTES.get(state)); + }); + } + }); + } + + /** + * Initialize the certificate metrics. + * + * @param meter The OpenTelemetry meter to use for creating metrics. + */ + public static void initCertMetrics(Meter meter, String prefix) throws CertificateException { + String truststoreCerts = truststoreCertsSupplier.get(); + String certChain = certChainSupplier.get(); + if (truststoreCerts == null || truststoreCerts.isEmpty()) { + return; + } + if (certChain == null || certChain.isEmpty()) { + return; + } + // Add TLS certificate metrics + addTlsMetrics(certChain, truststoreCerts, meter, prefix); + } + + /** + * Add TLS certificate metrics. + * + * @param certChain The certificate chain in PEM format. + * @param truststoreCerts The truststore certificates in PEM format. + * @param meter The OpenTelemetry meter to use for creating metrics. + * @param prefix The prefix for the metric names. + */ + private static void addTlsMetrics(String certChain, String truststoreCerts, Meter meter, String prefix) throws CertificateException { + // Parse and check the certificate expiration time + X509Certificate[] serverCerts = parseCertificates(certChain); + X509Certificate[] trustStoreCerts = parseCertificates(truststoreCerts); + + for (X509Certificate cert : serverCerts) { + registerCertMetrics(meter, cert, "server_cert", prefix); + } + for (X509Certificate cert : trustStoreCerts) { + registerCertMetrics(meter, cert, "truststore_cert", prefix); + } + } + + /** + * Register certificate metrics. + * + * @param meter The OpenTelemetry meter to use for creating metrics. + * @param cert The X509 certificate to register metrics for. + * @param certType The type of the certificate (e.g., "server_cert", "truststore_cert"). + * @param prefix The prefix for the metric names. + */ + private static void registerCertMetrics(Meter meter, X509Certificate cert, String certType, String prefix) { + String subject = cert.getSubjectX500Principal().getName(); + Date expiryDate = cert.getNotAfter(); + long daysRemaining = (expiryDate.getTime() - System.currentTimeMillis()) / (1000 * 3600 * 24); + + // Create and register Gauge metrics + Attributes attributes = Attributes.builder() + .put("cert_type", certType) + .put("cert_subject", subject) + .build(); + + ObservableLongGauge observableCertExpireMills = meter.gaugeBuilder(prefix + "cert_expiry_timestamp") + .setDescription("The expiry timestamp of the TLS certificate") + .setUnit("milliseconds") + .ofLongs() + .buildWithCallback(result -> result.record(expiryDate.getTime(), attributes)); + CERT_OBSERVABLE_LONG_GAUGES.add(observableCertExpireMills); + + ObservableLongGauge observableCertExpireDays = meter.gaugeBuilder(prefix + "cert_days_remaining") + .setDescription("The remaining days until the TLS certificate expires") + .setUnit("days") + .ofLongs() + .buildWithCallback(result -> result.record(daysRemaining, attributes)); + CERT_OBSERVABLE_LONG_GAUGES.add(observableCertExpireDays); + } + + /** + * Parse the PEM formatted certificate content into an array of X509 certificates. + * + * @param pemContent The PEM formatted certificate content. + * @return An array of X509 certificates. + * @throws CertificateException If there is an error parsing the certificates. + */ + private static X509Certificate[] parseCertificates(String pemContent) throws CertificateException { + String[] pemArray = pemContent.split("-----END CERTIFICATE-----"); + CertificateFactory factory = CertificateFactory.getInstance("X.509"); + X509Certificate[] certs = new X509Certificate[pemArray.length]; + + for (int i = 0; i < pemArray.length; i++) { + String pemPart = pemArray[i]; + byte[] certBytes = Base64.getDecoder().decode(pemPart.replace("-----BEGIN CERTIFICATE-----", "").replaceAll("\n", "")); + certs[i] = (X509Certificate) factory.generateCertificate(new ByteArrayInputStream(certBytes)); + } + return certs; } public static void setIsActiveSupplier(Supplier isActiveSupplier) { @@ -361,4 +482,12 @@ public static void setTopicPartitionCountMetricsSupplier(Supplier> backPressureStateSupplier) { S3StreamKafkaMetricsManager.backPressureStateSupplier = backPressureStateSupplier; } + + public static void setTruststoreCertsSupplier(Supplier truststoreCertsSupplier) { + S3StreamKafkaMetricsManager.truststoreCertsSupplier = truststoreCertsSupplier; + } + + public static void setCertChainSupplier(Supplier certChainSupplier) { + S3StreamKafkaMetricsManager.certChainSupplier = certChainSupplier; + } }