Skip to content

feat(metric): add cert metrics #2431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class TelemetryConstants {
public static final String CONTROLLER_JMX_YAML_CONFIG_PATH = "/jmx/rules/controller.yaml";
public static final String TELEMETRY_SCOPE_NAME = "automq_for_kafka";
public static final String KAFKA_METRICS_PREFIX = "kafka_stream_";
public static final String KAFKA_CERT_METRICS_PREFIX = "kafka_cert_";
public static final String KAFKA_WAL_METRICS_PREFIX = "kafka_wal_";
public static final AttributeKey<Long> STREAM_ID_NAME = AttributeKey.longKey("streamId");
public static final AttributeKey<Long> START_OFFSET_NAME = AttributeKey.longKey("startOffset");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import kafka.log.stream.s3.telemetry.otel.OTelHistogramReporter;
import kafka.server.KafkaConfig;

import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.server.ProcessRole;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.metrics.cert.CertKafkaMetricsManager;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;

import com.automq.stream.s3.metrics.MetricsConfig;
Expand Down Expand Up @@ -151,6 +153,17 @@ protected void initializeMetricsManager(Meter meter) {

// kraft controller may not have s3WALPath config.
ObjectWALMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_WAL_METRICS_PREFIX);
// Obtain the certificate chain and truststore certificates.
try {
Password certChainPassword = kafkaConfig.getPassword("ssl.keystore.certificate.chain");
Password truststoreCertsPassword = kafkaConfig.getPassword("ssl.truststore.certificates");

String certChain = certChainPassword != null ? certChainPassword.value() : null;
String truststoreCerts = truststoreCertsPassword != null ? truststoreCertsPassword.value() : null;
CertKafkaMetricsManager.initMetrics(meter, truststoreCerts, certChain, TelemetryConstants.KAFKA_CERT_METRICS_PREFIX);
} catch (Exception e) {
LOGGER.error("Failed to initialize cert metrics", e);
}
this.oTelHistogramReporter.start(meter);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package org.apache.kafka.server.metrics.cert;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Date;
import java.util.List;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;

public class CertKafkaMetricsManager {
// Logger for logging messages related to this class
private static final Logger LOGGER = LoggerFactory.getLogger(CertKafkaMetricsManager.class);

// List to store all the observable long gauges for certificates
private static final List<ObservableLongGauge> CERT_OBSERVABLE_LONG_GAUGES = new ArrayList<>();

/**
* Initialize the certificate metrics.
*
* @param meter The OpenTelemetry meter to use for creating metrics.
* @param truststoreCerts The truststore certificates in PEM format.
* @param certChain The certificate chain in PEM format.
*/
public static void initMetrics(Meter meter, String truststoreCerts, String certChain, String prefix) {
try {
if (truststoreCerts == null || truststoreCerts.isEmpty()) {
LOGGER.warn("Truststore certificates are empty or null");
return;
}
if (certChain == null || certChain.isEmpty()) {
LOGGER.warn("Certificate chain is empty or null");
return;
}
// Add TLS certificate metrics
addTlsMetrics(certChain, truststoreCerts, meter, prefix);
} catch (Exception e) {
LOGGER.error("Failed to initialize cert metrics", e);
}
}

/**
* Add TLS certificate metrics.
*
* @param certChain The certificate chain in PEM format.
* @param truststoreCerts The truststore certificates in PEM format.
* @param meter The OpenTelemetry meter to use for creating metrics.
* @param prefix The prefix for the metric names.
*/
private static void addTlsMetrics(String certChain, String truststoreCerts, Meter meter, String prefix) {
try {

// Parse and check the certificate expiration time
X509Certificate[] serverCerts = parseCertificates(certChain);
X509Certificate[] trustStoreCerts = parseCertificates(truststoreCerts);

for (X509Certificate cert : serverCerts) {
registerCertMetrics(meter, cert, "server_cert", prefix);
}
for (X509Certificate cert : trustStoreCerts) {
registerCertMetrics(meter, cert, "truststore_cert", prefix);
}

} catch (Exception e) {
LOGGER.error("Failed to add TLS metrics", e);
}
}

/**
* Register certificate metrics.
*
* @param meter The OpenTelemetry meter to use for creating metrics.
* @param cert The X509 certificate to register metrics for.
* @param certType The type of the certificate (e.g., "server_cert", "truststore_cert").
* @param prefix The prefix for the metric names.
*/
private static void registerCertMetrics(Meter meter, X509Certificate cert, String certType, String prefix) {
String subject = cert.getSubjectX500Principal().getName();
Date expiryDate = cert.getNotAfter();
long daysRemaining = (expiryDate.getTime() - System.currentTimeMillis()) / (1000 * 3600 * 24);

// Create and register Gauge metrics
Attributes attributes = Attributes.builder()
.put("cert_type", certType)
.put("cert_subject", subject)
.build();

ObservableLongGauge observableCertExpireMills = meter.gaugeBuilder(prefix + "expiry_timestamp")
.setDescription("The expiry timestamp of the TLS certificate")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(result -> result.record(expiryDate.getTime(), attributes));
CERT_OBSERVABLE_LONG_GAUGES.add(observableCertExpireMills);

ObservableLongGauge observableCertExpireDays = meter.gaugeBuilder(prefix + "days_remaining")
.setDescription("The remaining days until the TLS certificate expires")
.setUnit("days")
.ofLongs()
.buildWithCallback(result -> result.record(daysRemaining, attributes));
CERT_OBSERVABLE_LONG_GAUGES.add(observableCertExpireDays);
}

/**
* Parse the PEM formatted certificate content into an array of X509 certificates.
*
* @param pemContent The PEM formatted certificate content.
* @return An array of X509 certificates.
* @throws CertificateException If there is an error parsing the certificates.
*/
private static X509Certificate[] parseCertificates(String pemContent) throws CertificateException {
String[] pemArray = pemContent.split("-----END CERTIFICATE-----");
CertificateFactory factory = CertificateFactory.getInstance("X.509");
X509Certificate[] certs = new X509Certificate[pemArray.length];

for (int i = 0; i < pemArray.length; i++) {
String pemPart = pemArray[i];
byte[] certBytes = Base64.getDecoder().decode(pemPart.replace("-----BEGIN CERTIFICATE-----", "").replaceAll("\n", ""));
certs[i] = (X509Certificate) factory.generateCertificate(new ByteArrayInputStream(certBytes));
}
return certs;
}
}
Loading