diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 0612015d1a2fa..803cdbc0b9c3a 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -68,6 +68,7 @@ public class JmxReporter implements MetricsReporter { private String prefix; private final Map mbeans = new HashMap<>(); private Predicate mbeanPredicate = s -> true; + private boolean closed = false; public JmxReporter() { this.prefix = ""; @@ -191,6 +192,7 @@ static String getMBeanName(String prefix, MetricName metricName) { public void close() { synchronized (LOCK) { + closed = true; for (KafkaMbean mbean : this.mbeans.values()) unregister(mbean); } @@ -207,6 +209,12 @@ private void unregister(KafkaMbean mbean) { } private void reregister(KafkaMbean mbean) { + // avoid re-registering after being closed, which could lead to memory leaks + // See KAFKA-18337 for more detail. + if (closed) { + log.warn("JmxReporter has been closed. Cannot reregister mbean {}", mbean); + return; + } unregister(mbean); try { ManagementFactory.getPlatformMBeanServer().registerMBean(mbean, mbean.name()); diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java index f10670f8ab26c..c8a0f2f72a582 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.CumulativeSum; +import org.apache.kafka.common.metrics.stats.Max; import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.Test; @@ -177,4 +178,44 @@ public void testJmxPrefix() throws Exception { metrics.close(); } } + + @Test + public void testClose() throws Exception { + Metrics metrics = new Metrics(); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + JmxReporter reporter = new JmxReporter(); + try { + metrics.addReporter(reporter); + + assertFalse(server.isRegistered(new ObjectName(":type=grp1"))); + + Sensor sensor = metrics.sensor("kafka.requests"); + sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg()); + sensor.add(metrics.metricName("pack.bean1.max", "grp1"), new Max()); + sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new CumulativeSum()); + + assertTrue(server.isRegistered(new ObjectName(":type=grp1"))); + assertEquals(Double.NaN, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.avg")); + assertEquals(Double.NaN, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.max")); + assertTrue(server.isRegistered(new ObjectName(":type=grp2"))); + assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp2"), "pack.bean2.total")); + + MetricName metricName = metrics.metricName("pack.bean1.avg", "grp1"); + String mBeanName = JmxReporter.getMBeanName("", metricName); + assertTrue(reporter.containsMbean(mBeanName)); + + assertTrue(server.isRegistered(new ObjectName(":type=grp1"))); + assertTrue(server.isRegistered(new ObjectName(":type=grp2"))); + } finally { + metrics.close(); + } + + assertFalse(server.isRegistered(new ObjectName(":type=grp1"))); + assertFalse(server.isRegistered(new ObjectName(":type=grp2"))); + + MetricName metricName = metrics.metricName("pack.bean1.avg", "grp1"); + metrics.removeMetric(metricName); + assertFalse(server.isRegistered(new ObjectName(":type=grp1"))); + assertFalse(server.isRegistered(new ObjectName(":type=grp2"))); + } }