From 5e46316ebe6a4b14bad8bb86b79dbb900e66bbca Mon Sep 17 00:00:00 2001 From: tianmaowei Date: Fri, 27 Dec 2024 18:39:25 +0800 Subject: [PATCH 1/2] KAFKA-18337: Add a close status for JmxReporter, that avoid re-registering after being closed, which could lead to memory leaks --- .../kafka/common/metrics/JmxReporter.java | 8 ++++ .../kafka/common/metrics/JmxReporterTest.java | 41 +++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 0612015d1a2fa..5c5f81a984b27 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -68,6 +68,7 @@ public class JmxReporter implements MetricsReporter { private String prefix; private final Map mbeans = new HashMap<>(); private Predicate mbeanPredicate = s -> true; + private boolean closed = false; public JmxReporter() { this.prefix = ""; @@ -191,6 +192,7 @@ static String getMBeanName(String prefix, MetricName metricName) { public void close() { synchronized (LOCK) { + closed = true; for (KafkaMbean mbean : this.mbeans.values()) unregister(mbean); } @@ -207,6 +209,12 @@ private void unregister(KafkaMbean mbean) { } private void reregister(KafkaMbean mbean) { + // avoid re-registering after being closed, which could lead to memory leaks + // See KAFKA-18337 for more detail. + if (closed) { + log.warn("JmxReporter has been closed!, cannot re-registering mbean {}", mbean); + return; + } unregister(mbean); try { ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name()); diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java index f10670f8ab26c..c8a0f2f72a582 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.Test; @@ -177,4 +178,44 @@ public void testJmxPrefix() throws Exception { metrics.close(); } } + + @Test + public void testClose() throws Exception { + Metrics metrics = new Metrics(); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + JmxReporter reporter = new JmxReporter(); + try { + metrics.addReporter(reporter); + + assertFalse(server.isRegistered(new ObjectName(":type=grp1"))); + + Sensor sensor = metrics.sensor("kafka.requests"); + sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg()); + sensor.add(metrics.metricName("pack.bean1.max", "grp1"), new Max()); + sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new CumulativeSum()); + + assertTrue(server.isRegistered(new ObjectName(":type=grp1"))); + assertEquals(Double.NaN, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.avg")); + assertEquals(Double.NaN, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.max")); + assertTrue(server.isRegistered(new ObjectName(":type=grp2"))); + assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp2"), "pack.bean2.total")); + + MetricName metricName = metrics.metricName("pack.bean1.avg", "grp1"); + String mBeanName = JmxReporter.getMBeanName("", metricName); + assertTrue(reporter.containsMbean(mBeanName)); + + assertTrue(server.isRegistered(new ObjectName(":type=grp1"))); + assertTrue(server.isRegistered(new ObjectName(":type=grp2"))); + } finally { + metrics.close(); + } + + assertFalse(server.isRegistered(new ObjectName(":type=grp1"))); + assertFalse(server.isRegistered(new ObjectName(":type=grp2"))); + + MetricName metricName = metrics.metricName("pack.bean1.avg", "grp1"); + metrics.removeMetric(metricName); + assertFalse(server.isRegistered(new ObjectName(":type=grp1"))); + assertFalse(server.isRegistered(new ObjectName(":type=grp2"))); + } } From e76a467a075f63a89e6e7a8c55395531a500281d Mon Sep 17 00:00:00 2001 From: tianmaowei Date: Sun, 26 Jan 2025 16:50:19 +0800 Subject: [PATCH 2/2] KAFKA-18337: fix the log grammar --- .../main/java/org/apache/kafka/common/metrics/JmxReporter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 5c5f81a984b27..803cdbc0b9c3a 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -212,7 +212,7 @@ private void reregister(KafkaMbean mbean) { // avoid re-registering after being closed, which could lead to memory leaks // See KAFKA-18337 for more detail. if (closed) { - log.warn("JmxReporter has been closed!, cannot re-registering mbean {}", mbean); + log.warn("JmxReporter has been closed. Cannot reregister mbean {}", mbean); return; } unregister(mbean);