Skip to content

Commit 7fdada6

Browse files
authored
feat(metric): add cert metrics (#2431)
* feat: add cert metrics * feat: check cert null * feat: fix format * feat: adjust cert metrics position * feat: remove cert prefix
1 parent 21be8ef commit 7fdada6

File tree

2 files changed

+159
-10
lines changed

2 files changed

+159
-10
lines changed

core/src/main/scala/kafka/log/stream/s3/telemetry/TelemetryManager.java

+20
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import kafka.log.stream.s3.telemetry.otel.OTelHistogramReporter;
2525
import kafka.server.KafkaConfig;
2626

27+
import org.apache.kafka.common.config.types.Password;
2728
import org.apache.kafka.server.ProcessRole;
2829
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
2930
import org.apache.kafka.server.metrics.s3stream.S3StreamKafkaMetricsManager;
@@ -151,6 +152,25 @@ protected MetricsExporterURI buildMetricsExporterURI(String clusterId, KafkaConf
151152
}
152153

153154
protected void initializeMetricsManager(Meter meter) {
155+
S3StreamKafkaMetricsManager.setTruststoreCertsSupplier(() -> {
156+
try {
157+
Password truststoreCertsPassword = kafkaConfig.getPassword("ssl.truststore.certificates");
158+
return truststoreCertsPassword != null ? truststoreCertsPassword.value() : null;
159+
} catch (Exception e) {
160+
LOGGER.error("Failed to get truststore certs", e);
161+
return null;
162+
}
163+
});
164+
165+
S3StreamKafkaMetricsManager.setCertChainSupplier(() -> {
166+
try {
167+
Password certChainPassword = kafkaConfig.getPassword("ssl.keystore.certificate.chain");
168+
return certChainPassword != null ? certChainPassword.value() : null;
169+
} catch (Exception e) {
170+
LOGGER.error("Failed to get cert chain", e);
171+
return null;
172+
}
173+
});
154174
S3StreamMetricsManager.configure(new MetricsConfig(metricsLevel(), Attributes.empty(), kafkaConfig.s3ExporterReportIntervalMs()));
155175
S3StreamMetricsManager.initMetrics(meter, TelemetryConstants.KAFKA_METRICS_PREFIX);
156176

server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java

+139-10
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,17 @@
2828
import com.automq.stream.s3.metrics.wrapper.HistogramInstrument;
2929
import com.automq.stream.s3.metrics.wrapper.HistogramMetric;
3030

31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import java.io.ByteArrayInputStream;
35+
import java.security.cert.CertificateException;
36+
import java.security.cert.CertificateFactory;
37+
import java.security.cert.X509Certificate;
3138
import java.util.ArrayList;
39+
import java.util.Base64;
3240
import java.util.Collections;
41+
import java.util.Date;
3342
import java.util.List;
3443
import java.util.Map;
3544
import java.util.concurrent.CopyOnWriteArrayList;
@@ -43,6 +52,8 @@
4352

4453
public class S3StreamKafkaMetricsManager {
4554

55+
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamKafkaMetricsManager.class);
56+
4657
private static final List<ConfigListener> BASE_ATTRIBUTES_LISTENERS = new ArrayList<>();
4758

4859
public static final List<HistogramMetric> FETCH_LIMITER_TIME_METRICS = new CopyOnWriteArrayList<>();
@@ -60,6 +71,9 @@ public class S3StreamKafkaMetricsManager {
6071
private static final MultiAttributes<String> BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
6172
S3StreamKafkaMetricsConstants.LABEL_BACK_PRESSURE_STATE);
6273

74+
// List to store all the observable long gauges for certificates
75+
private static final List<ObservableLongGauge> CERT_OBSERVABLE_LONG_GAUGES = new ArrayList<>();
76+
6377
static {
6478
BASE_ATTRIBUTES_LISTENERS.add(BROKER_ATTRIBUTES);
6579
BASE_ATTRIBUTES_LISTENERS.add(S3_OBJECT_ATTRIBUTES);
@@ -108,6 +122,15 @@ public class S3StreamKafkaMetricsManager {
108122
*/
109123
private static Supplier<Map<String, Integer>> backPressureStateSupplier = Collections::emptyMap;
110124

125+
/**
126+
* supplier for truststoreCerts
127+
*/
128+
private static Supplier<String> truststoreCertsSupplier = () -> null;
129+
/**
130+
* supplier for server cert chain
131+
*/
132+
private static Supplier<String> certChainSupplier = () -> null;
133+
111134
public static void configure(MetricsConfig metricsConfig) {
112135
synchronized (BASE_ATTRIBUTES_LISTENERS) {
113136
S3StreamKafkaMetricsManager.metricsConfig = metricsConfig;
@@ -124,6 +147,11 @@ public static void initMetrics(Meter meter, String prefix) {
124147
initLogAppendMetrics(meter, prefix);
125148
initPartitionStatusStatisticsMetrics(meter, prefix);
126149
initBackPressureMetrics(meter, prefix);
150+
try {
151+
initCertMetrics(meter, prefix);
152+
} catch (Exception e) {
153+
LOGGER.error("Failed to init cert metrics", e);
154+
}
127155
}
128156

129157
private static void initAutoBalancerMetrics(Meter meter, String prefix) {
@@ -284,16 +312,109 @@ private static void initPartitionStatusStatisticsMetrics(Meter meter, String pre
284312

285313
private static void initBackPressureMetrics(Meter meter, String prefix) {
286314
backPressureState = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.BACK_PRESSURE_STATE_METRIC_NAME)
287-
.setDescription("Back pressure state")
288-
.ofLongs()
289-
.buildWithCallback(result -> {
290-
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
291-
Map<String, Integer> states = backPressureStateSupplier.get();
292-
states.forEach((state, value) -> {
293-
result.record(value, BACK_PRESSURE_STATE_ATTRIBUTES.get(state));
294-
});
295-
}
296-
});
315+
.setDescription("Back pressure state")
316+
.ofLongs()
317+
.buildWithCallback(result -> {
318+
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
319+
Map<String, Integer> states = backPressureStateSupplier.get();
320+
states.forEach((state, value) -> {
321+
result.record(value, BACK_PRESSURE_STATE_ATTRIBUTES.get(state));
322+
});
323+
}
324+
});
325+
}
326+
327+
/**
328+
* Initialize the certificate metrics.
329+
*
330+
* @param meter The OpenTelemetry meter to use for creating metrics.
331+
*/
332+
public static void initCertMetrics(Meter meter, String prefix) throws CertificateException {
333+
String truststoreCerts = truststoreCertsSupplier.get();
334+
String certChain = certChainSupplier.get();
335+
if (truststoreCerts == null || truststoreCerts.isEmpty()) {
336+
return;
337+
}
338+
if (certChain == null || certChain.isEmpty()) {
339+
return;
340+
}
341+
// Add TLS certificate metrics
342+
addTlsMetrics(certChain, truststoreCerts, meter, prefix);
343+
}
344+
345+
/**
346+
* Add TLS certificate metrics.
347+
*
348+
* @param certChain The certificate chain in PEM format.
349+
* @param truststoreCerts The truststore certificates in PEM format.
350+
* @param meter The OpenTelemetry meter to use for creating metrics.
351+
* @param prefix The prefix for the metric names.
352+
*/
353+
private static void addTlsMetrics(String certChain, String truststoreCerts, Meter meter, String prefix) throws CertificateException {
354+
// Parse and check the certificate expiration time
355+
X509Certificate[] serverCerts = parseCertificates(certChain);
356+
X509Certificate[] trustStoreCerts = parseCertificates(truststoreCerts);
357+
358+
for (X509Certificate cert : serverCerts) {
359+
registerCertMetrics(meter, cert, "server_cert", prefix);
360+
}
361+
for (X509Certificate cert : trustStoreCerts) {
362+
registerCertMetrics(meter, cert, "truststore_cert", prefix);
363+
}
364+
}
365+
366+
/**
367+
* Register certificate metrics.
368+
*
369+
* @param meter The OpenTelemetry meter to use for creating metrics.
370+
* @param cert The X509 certificate to register metrics for.
371+
* @param certType The type of the certificate (e.g., "server_cert", "truststore_cert").
372+
* @param prefix The prefix for the metric names.
373+
*/
374+
private static void registerCertMetrics(Meter meter, X509Certificate cert, String certType, String prefix) {
375+
String subject = cert.getSubjectX500Principal().getName();
376+
Date expiryDate = cert.getNotAfter();
377+
long daysRemaining = (expiryDate.getTime() - System.currentTimeMillis()) / (1000 * 3600 * 24);
378+
379+
// Create and register Gauge metrics
380+
Attributes attributes = Attributes.builder()
381+
.put("cert_type", certType)
382+
.put("cert_subject", subject)
383+
.build();
384+
385+
ObservableLongGauge observableCertExpireMills = meter.gaugeBuilder(prefix + "cert_expiry_timestamp")
386+
.setDescription("The expiry timestamp of the TLS certificate")
387+
.setUnit("milliseconds")
388+
.ofLongs()
389+
.buildWithCallback(result -> result.record(expiryDate.getTime(), attributes));
390+
CERT_OBSERVABLE_LONG_GAUGES.add(observableCertExpireMills);
391+
392+
ObservableLongGauge observableCertExpireDays = meter.gaugeBuilder(prefix + "cert_days_remaining")
393+
.setDescription("The remaining days until the TLS certificate expires")
394+
.setUnit("days")
395+
.ofLongs()
396+
.buildWithCallback(result -> result.record(daysRemaining, attributes));
397+
CERT_OBSERVABLE_LONG_GAUGES.add(observableCertExpireDays);
398+
}
399+
400+
/**
401+
* Parse the PEM formatted certificate content into an array of X509 certificates.
402+
*
403+
* @param pemContent The PEM formatted certificate content.
404+
* @return An array of X509 certificates.
405+
* @throws CertificateException If there is an error parsing the certificates.
406+
*/
407+
private static X509Certificate[] parseCertificates(String pemContent) throws CertificateException {
408+
String[] pemArray = pemContent.split("-----END CERTIFICATE-----");
409+
CertificateFactory factory = CertificateFactory.getInstance("X.509");
410+
X509Certificate[] certs = new X509Certificate[pemArray.length];
411+
412+
for (int i = 0; i < pemArray.length; i++) {
413+
String pemPart = pemArray[i];
414+
byte[] certBytes = Base64.getDecoder().decode(pemPart.replace("-----BEGIN CERTIFICATE-----", "").replaceAll("\n", ""));
415+
certs[i] = (X509Certificate) factory.generateCertificate(new ByteArrayInputStream(certBytes));
416+
}
417+
return certs;
297418
}
298419

299420
public static void setIsActiveSupplier(Supplier<Boolean> isActiveSupplier) {
@@ -369,4 +490,12 @@ public static void setTopicPartitionCountMetricsSupplier(Supplier<PartitionCount
369490
public static void setBackPressureStateSupplier(Supplier<Map<String, Integer>> backPressureStateSupplier) {
370491
S3StreamKafkaMetricsManager.backPressureStateSupplier = backPressureStateSupplier;
371492
}
493+
494+
public static void setTruststoreCertsSupplier(Supplier<String> truststoreCertsSupplier) {
495+
S3StreamKafkaMetricsManager.truststoreCertsSupplier = truststoreCertsSupplier;
496+
}
497+
498+
public static void setCertChainSupplier(Supplier<String> certChainSupplier) {
499+
S3StreamKafkaMetricsManager.certChainSupplier = certChainSupplier;
500+
}
372501
}

0 commit comments

Comments
 (0)