Skip to content

feat(metric): add cert metrics #2431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import kafka.log.stream.s3.telemetry.otel.OTelHistogramReporter;
import kafka.server.KafkaConfig;

import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.server.ProcessRole;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;
Expand Down Expand Up @@ -143,6 +144,25 @@ protected MetricsExporterURI buildMetricsExporterURI(String clusterId, KafkaConf
}

protected void initializeMetricsManager(Meter meter) {
S3StreamKafkaMetricsManager.setTruststoreCertsSupplier(() -> {
try {
Password truststoreCertsPassword = kafkaConfig.getPassword("ssl.truststore.certificates");
return truststoreCertsPassword != null ? truststoreCertsPassword.value() : null;
} catch (Exception e) {
LOGGER.error("Failed to get truststore certs", e);
return null;
}
});

S3StreamKafkaMetricsManager.setCertChainSupplier(() -> {
try {
Password certChainPassword = kafkaConfig.getPassword("ssl.keystore.certificate.chain");
return certChainPassword != null ? certChainPassword.value() : null;
} catch (Exception e) {
LOGGER.error("Failed to get cert chain", e);
return null;
}
});
S3StreamMetricsManager.configure(new MetricsConfig(metricsLevel(), Attributes.empty(), kafkaConfig.s3ExporterReportIntervalMs()));
S3StreamMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,17 @@
import com.automq.stream.s3.metrics.wrapper.HistogramInstrument;
import com.automq.stream.s3.metrics.wrapper.HistogramMetric;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
Expand All @@ -35,6 +44,8 @@

public class S3StreamKafkaMetricsManager {

private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamKafkaMetricsManager.class);

private static final List<ConfigListener> BASE_ATTRIBUTES_LISTENERS = new ArrayList<>();

public static final List<HistogramMetric> FETCH_LIMITER_TIME_METRICS = new CopyOnWriteArrayList<>();
Expand All @@ -52,6 +63,9 @@ public class S3StreamKafkaMetricsManager {
private static final MultiAttributes<String> BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamKafkaMetricsConstants.LABEL_BACK_PRESSURE_STATE);

// List to store all the observable long gauges for certificates
private static final List<ObservableLongGauge> CERT_OBSERVABLE_LONG_GAUGES = new ArrayList<>();

static {
BASE_ATTRIBUTES_LISTENERS.add(BROKER_ATTRIBUTES);
BASE_ATTRIBUTES_LISTENERS.add(S3_OBJECT_ATTRIBUTES);
Expand Down Expand Up @@ -100,6 +114,15 @@ public class S3StreamKafkaMetricsManager {
*/
private static Supplier<Map<String, Integer>> backPressureStateSupplier = Collections::emptyMap;

/**
* supplier for truststoreCerts
*/
private static Supplier<String> truststoreCertsSupplier = () -> null;
/**
* supplier for server cert chain
*/
private static Supplier<String> certChainSupplier = () -> null;

public static void configure(MetricsConfig metricsConfig) {
synchronized (BASE_ATTRIBUTES_LISTENERS) {
S3StreamKafkaMetricsManager.metricsConfig = metricsConfig;
Expand All @@ -116,6 +139,11 @@ public static void initMetrics(Meter meter, String prefix) {
initLogAppendMetrics(meter, prefix);
initPartitionStatusStatisticsMetrics(meter, prefix);
initBackPressureMetrics(meter, prefix);
try {
initCertMetrics(meter, prefix);
} catch (Exception e) {
LOGGER.error("Failed to init cert metrics", e);
}
}

private static void initAutoBalancerMetrics(Meter meter, String prefix) {
Expand Down Expand Up @@ -276,16 +304,109 @@ private static void initPartitionStatusStatisticsMetrics(Meter meter, String pre

private static void initBackPressureMetrics(Meter meter, String prefix) {
backPressureState = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.BACK_PRESSURE_STATE_METRIC_NAME)
.setDescription("Back pressure state")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
Map<String, Integer> states = backPressureStateSupplier.get();
states.forEach((state, value) -> {
result.record(value, BACK_PRESSURE_STATE_ATTRIBUTES.get(state));
});
}
});
.setDescription("Back pressure state")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
Map<String, Integer> states = backPressureStateSupplier.get();
states.forEach((state, value) -> {
result.record(value, BACK_PRESSURE_STATE_ATTRIBUTES.get(state));
});
}
});
}

/**
* Initialize the certificate metrics.
*
* @param meter The OpenTelemetry meter to use for creating metrics.
*/
public static void initCertMetrics(Meter meter, String prefix) throws CertificateException {
String truststoreCerts = truststoreCertsSupplier.get();
String certChain = certChainSupplier.get();
if (truststoreCerts == null || truststoreCerts.isEmpty()) {
return;
}
if (certChain == null || certChain.isEmpty()) {
return;
}
// Add TLS certificate metrics
addTlsMetrics(certChain, truststoreCerts, meter, prefix);
}

/**
* Add TLS certificate metrics.
*
* @param certChain The certificate chain in PEM format.
* @param truststoreCerts The truststore certificates in PEM format.
* @param meter The OpenTelemetry meter to use for creating metrics.
* @param prefix The prefix for the metric names.
*/
private static void addTlsMetrics(String certChain, String truststoreCerts, Meter meter, String prefix) throws CertificateException {
// Parse and check the certificate expiration time
X509Certificate[] serverCerts = parseCertificates(certChain);
X509Certificate[] trustStoreCerts = parseCertificates(truststoreCerts);

for (X509Certificate cert : serverCerts) {
registerCertMetrics(meter, cert, "server_cert", prefix);
}
for (X509Certificate cert : trustStoreCerts) {
registerCertMetrics(meter, cert, "truststore_cert", prefix);
}
}

/**
* Register certificate metrics.
*
* @param meter The OpenTelemetry meter to use for creating metrics.
* @param cert The X509 certificate to register metrics for.
* @param certType The type of the certificate (e.g., "server_cert", "truststore_cert").
* @param prefix The prefix for the metric names.
*/
private static void registerCertMetrics(Meter meter, X509Certificate cert, String certType, String prefix) {
String subject = cert.getSubjectX500Principal().getName();
Date expiryDate = cert.getNotAfter();
long daysRemaining = (expiryDate.getTime() - System.currentTimeMillis()) / (1000 * 3600 * 24);

// Create and register Gauge metrics
Attributes attributes = Attributes.builder()
.put("cert_type", certType)
.put("cert_subject", subject)
.build();

ObservableLongGauge observableCertExpireMills = meter.gaugeBuilder(prefix + "cert_expiry_timestamp")
.setDescription("The expiry timestamp of the TLS certificate")
.setUnit("milliseconds")
.ofLongs()
.buildWithCallback(result -> result.record(expiryDate.getTime(), attributes));
CERT_OBSERVABLE_LONG_GAUGES.add(observableCertExpireMills);

ObservableLongGauge observableCertExpireDays = meter.gaugeBuilder(prefix + "cert_days_remaining")
.setDescription("The remaining days until the TLS certificate expires")
.setUnit("days")
.ofLongs()
.buildWithCallback(result -> result.record(daysRemaining, attributes));
CERT_OBSERVABLE_LONG_GAUGES.add(observableCertExpireDays);
}

/**
* Parse the PEM formatted certificate content into an array of X509 certificates.
*
* @param pemContent The PEM formatted certificate content.
* @return An array of X509 certificates.
* @throws CertificateException If there is an error parsing the certificates.
*/
private static X509Certificate[] parseCertificates(String pemContent) throws CertificateException {
String[] pemArray = pemContent.split("-----END CERTIFICATE-----");
CertificateFactory factory = CertificateFactory.getInstance("X.509");
X509Certificate[] certs = new X509Certificate[pemArray.length];

for (int i = 0; i < pemArray.length; i++) {
String pemPart = pemArray[i];
byte[] certBytes = Base64.getDecoder().decode(pemPart.replace("-----BEGIN CERTIFICATE-----", "").replaceAll("\n", ""));
certs[i] = (X509Certificate) factory.generateCertificate(new ByteArrayInputStream(certBytes));
}
return certs;
}

public static void setIsActiveSupplier(Supplier<Boolean> isActiveSupplier) {
Expand Down Expand Up @@ -361,4 +482,12 @@ public static void setTopicPartitionCountMetricsSupplier(Supplier<PartitionCount
public static void setBackPressureStateSupplier(Supplier<Map<String, Integer>> backPressureStateSupplier) {
S3StreamKafkaMetricsManager.backPressureStateSupplier = backPressureStateSupplier;
}

public static void setTruststoreCertsSupplier(Supplier<String> truststoreCertsSupplier) {
S3StreamKafkaMetricsManager.truststoreCertsSupplier = truststoreCertsSupplier;
}

public static void setCertChainSupplier(Supplier<String> certChainSupplier) {
S3StreamKafkaMetricsManager.certChainSupplier = certChainSupplier;
}
}
Loading