diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/SocketServerMemoryPoolTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/SocketServerMemoryPoolTest.java index 2278718a1cc11..8946faced2859 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/SocketServerMemoryPoolTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/SocketServerMemoryPoolTest.java @@ -16,10 +16,6 @@ */ package org.apache.kafka.clients.producer; -import kafka.network.SocketServer; -import kafka.server.KafkaBroker; - -import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.test.ClusterInstance; import org.apache.kafka.common.test.api.ClusterConfigProperty; @@ -28,10 +24,12 @@ import org.apache.kafka.common.test.api.Type; import org.apache.kafka.network.SocketServerConfigs; import org.apache.kafka.server.IntegrationTestUtils; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; + +import com.yammer.metrics.core.Gauge; import java.io.EOFException; import java.io.InputStream; -import java.lang.reflect.Field; import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; @@ -96,16 +94,15 @@ private void sendAndAssert(ClusterInstance clusterInstance, byte[] rawRequestByt assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable); } - // This test uses reflection to read the SocketServer memoryPool availableMemory. - // The metric "MemoryPoolAvailable" from Yammer Metrics default registry - // can be overwritten in a @ClusterTest as the registry is a singleton. - long getMemoryPoolAvailable(ClusterInstance clusterInstance) throws Exception { - KafkaBroker broker = clusterInstance.aliveBrokers().values().iterator().next(); - SocketServer socketServer = broker.socketServer(); - Field memoryPoolField = socketServer.getClass().getDeclaredField("memoryPool"); - memoryPoolField.setAccessible(true); - MemoryPool memoryPool = (MemoryPool) memoryPoolField.get(socketServer); - return memoryPool.availableMemory(); + @SuppressWarnings("unchecked") + long getMemoryPoolAvailable(ClusterInstance clusterInstance) { + return KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream() + .filter(e -> e.getKey().getType().equals("SocketServer")) + .filter(e -> e.getKey().getName().equals("MemoryPoolAvailable")) + .filter(e -> e.getKey().getMBeanName().contains("listenerType=BROKER")) + .map(e -> ((Gauge) e.getValue()).value()) + .findFirst() + .orElseThrow(() -> new RuntimeException("MemoryPoolAvailable metric not found for broker SocketServer")); } /** diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 06d1c0f792f63..00d7990a3eabf 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -87,6 +87,7 @@ class SocketServer( private val maxQueuedRequests = config.queuedMaxRequests protected val nodeId: Int = config.brokerId + private val metricsTags = MetricsUtils.getTags("nodeId", nodeId.toString, "listenerType", apiVersionManager.listenerType.name) private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ") @@ -127,10 +128,10 @@ class SocketServer( Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) }.sum / dataPlaneProcessors.size } - }) + }, metricsTags) - metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory) - metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory) + metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory, metricsTags) + metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory, metricsTags) metricsGroup.newGauge(s"ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors) val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p => @@ -139,7 +140,7 @@ class SocketServer( expiredConnectionsKilledCountMetricNames.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double]) }.sum - }) + }, metricsTags) // Create acceptors and processors for the statically configured endpoints when the // SocketServer is constructed. Note that this just opens the ports and creates the data @@ -260,6 +261,10 @@ class SocketServer( dataPlaneRequestChannel.shutdown() connectionQuotas.close() } + metricsGroup.removeMetric("NetworkProcessorAvgIdlePercent", metricsTags) + metricsGroup.removeMetric("MemoryPoolAvailable", metricsTags) + metricsGroup.removeMetric("MemoryPoolUsed", metricsTags) + metricsGroup.removeMetric("ExpiredConnectionsKilledCount", metricsTags) info("Shutdown completed") } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index a90590d04f410..b4dc30f98b90d 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -1242,6 +1242,42 @@ class SocketServerTest { } } + @Test + def testSocketServerMetricsTags(): Unit = { + val socketServerMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (k, _) => k.getType == "SocketServer" } + .collect { case (k, _: Gauge[_]) => k } + + val expectedMetricNames = Set( + "NetworkProcessorAvgIdlePercent", + "MemoryPoolAvailable", + "MemoryPoolUsed", + "ExpiredConnectionsKilledCount" + ) + + val foundMetricNames = socketServerMetrics.map(_.getName).toSet + assertEquals(expectedMetricNames, foundMetricNames) + + socketServerMetrics.foreach { metricName => + assertTrue(metricName.getMBeanName.contains("nodeId=0"), + s"Metric ${metricName.getName} should have nodeId tag, but MBean name is: ${metricName.getMBeanName}") + assertTrue(metricName.getMBeanName.contains("listenerType=BROKER"), + s"Metric ${metricName.getName} should have listenerType tag, but MBean name is: ${metricName.getMBeanName}") + } + } + + @Test + def testSocketServerMetricsRemovedOnShutdown(): Unit = { + shutdownServerAndMetrics(server) + + val socketServerMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (k, _) => k.getType == "SocketServer" } + .collect { case (k, _: Gauge[_]) => k } + + assertTrue(socketServerMetrics.isEmpty, + s"SocketServer metrics should be removed after shutdown, but found: ${socketServerMetrics.map(_.getName).mkString(", ")}") + } + /** * Tests exception handling in [[Processor.configureNewConnections]]. Exception is * injected into [[Selector.register]] which is used to register each new connection. diff --git a/docs/operations/monitoring.md b/docs/operations/monitoring.md index c7f22f3b1666f..963662cf40464 100644 --- a/docs/operations/monitoring.md +++ b/docs/operations/monitoring.md @@ -773,7 +773,7 @@ The average fraction of time the network processors are idle -kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent +kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent,nodeId=[nodeId],listenerType=[BROKER|CONTROLLER] @@ -799,7 +799,7 @@ The total number of connections disconnected, across all processors, due to a cl -kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount +kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount,nodeId=[nodeId],listenerType=[BROKER|CONTROLLER]