Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
*/
package org.apache.kafka.clients.producer;

import kafka.network.SocketServer;
import kafka.server.KafkaBroker;

import org.apache.kafka.common.memory.MemoryPool;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
Expand All @@ -28,10 +24,12 @@
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.IntegrationTestUtils;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;

import com.yammer.metrics.core.Gauge;

import java.io.EOFException;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -96,16 +94,15 @@ private void sendAndAssert(ClusterInstance clusterInstance, byte[] rawRequestByt
assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable);
}

// This test uses reflection to read the SocketServer memoryPool availableMemory.
// The metric "MemoryPoolAvailable" from Yammer Metrics default registry
// can be overwritten in a @ClusterTest as the registry is a singleton.
long getMemoryPoolAvailable(ClusterInstance clusterInstance) throws Exception {
KafkaBroker broker = clusterInstance.aliveBrokers().values().iterator().next();
SocketServer socketServer = broker.socketServer();
Field memoryPoolField = socketServer.getClass().getDeclaredField("memoryPool");
memoryPoolField.setAccessible(true);
MemoryPool memoryPool = (MemoryPool) memoryPoolField.get(socketServer);
return memoryPool.availableMemory();
@SuppressWarnings("unchecked")
long getMemoryPoolAvailable(ClusterInstance clusterInstance) {
return KafkaYammerMetrics.defaultRegistry().allMetrics().entrySet().stream()
.filter(e -> e.getKey().getType().equals("SocketServer"))
.filter(e -> e.getKey().getName().equals("MemoryPoolAvailable"))
.filter(e -> e.getKey().getMBeanName().contains("listenerType=BROKER"))
.map(e -> ((Gauge<Long>) e.getValue()).value())
.findFirst()
.orElseThrow(() -> new RuntimeException("MemoryPoolAvailable metric not found for broker SocketServer"));
}

/**
Expand Down
13 changes: 9 additions & 4 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class SocketServer(
private val maxQueuedRequests = config.queuedMaxRequests

protected val nodeId: Int = config.brokerId
private val metricsTags = MetricsUtils.getTags("nodeId", nodeId.toString, "listenerType", apiVersionManager.listenerType.name)

private val logContext = new LogContext(s"[SocketServer listenerType=${apiVersionManager.listenerType}, nodeId=$nodeId] ")

Expand Down Expand Up @@ -127,10 +128,10 @@ class SocketServer(
Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
}.sum / dataPlaneProcessors.size
}
})
}, metricsTags)

metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory)
metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory, metricsTags)
metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory, metricsTags)
metricsGroup.newGauge(s"ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
val dataPlaneProcessors = dataPlaneAcceptors.asScala.values.flatMap(a => a.processors)
val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.map { p =>
Expand All @@ -139,7 +140,7 @@ class SocketServer(
expiredConnectionsKilledCountMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
}.sum
})
}, metricsTags)

// Create acceptors and processors for the statically configured endpoints when the
// SocketServer is constructed. Note that this just opens the ports and creates the data
Expand Down Expand Up @@ -260,6 +261,10 @@ class SocketServer(
dataPlaneRequestChannel.shutdown()
connectionQuotas.close()
}
metricsGroup.removeMetric("NetworkProcessorAvgIdlePercent", metricsTags)
metricsGroup.removeMetric("MemoryPoolAvailable", metricsTags)
metricsGroup.removeMetric("MemoryPoolUsed", metricsTags)
metricsGroup.removeMetric("ExpiredConnectionsKilledCount", metricsTags)
info("Shutdown completed")
}

Expand Down
36 changes: 36 additions & 0 deletions core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,42 @@ class SocketServerTest {
}
}

@Test
def testSocketServerMetricsTags(): Unit = {
val socketServerMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
.filter { case (k, _) => k.getType == "SocketServer" }
.collect { case (k, _: Gauge[_]) => k }

val expectedMetricNames = Set(
"NetworkProcessorAvgIdlePercent",
"MemoryPoolAvailable",
"MemoryPoolUsed",
"ExpiredConnectionsKilledCount"
)

val foundMetricNames = socketServerMetrics.map(_.getName).toSet
assertEquals(expectedMetricNames, foundMetricNames)

socketServerMetrics.foreach { metricName =>
assertTrue(metricName.getMBeanName.contains("nodeId=0"),
s"Metric ${metricName.getName} should have nodeId tag, but MBean name is: ${metricName.getMBeanName}")
assertTrue(metricName.getMBeanName.contains("listenerType=BROKER"),
s"Metric ${metricName.getName} should have listenerType tag, but MBean name is: ${metricName.getMBeanName}")
}
}

@Test
def testSocketServerMetricsRemovedOnShutdown(): Unit = {
shutdownServerAndMetrics(server)

val socketServerMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala
.filter { case (k, _) => k.getType == "SocketServer" }
.collect { case (k, _: Gauge[_]) => k }

assertTrue(socketServerMetrics.isEmpty,
s"SocketServer metrics should be removed after shutdown, but found: ${socketServerMetrics.map(_.getName).mkString(", ")}")
}

/**
* Tests exception handling in [[Processor.configureNewConnections]]. Exception is
* injected into [[Selector.register]] which is used to register each new connection.
Expand Down
4 changes: 2 additions & 2 deletions docs/operations/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ The average fraction of time the network processors are idle
</td>
<td>

kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent,nodeId=[nodeId],listenerType=[BROKER|CONTROLLER]
</td>
<td>

Expand All @@ -799,7 +799,7 @@ The total number of connections disconnected, across all processors, due to a cl
</td>
<td>

kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount
kafka.network:type=SocketServer,name=ExpiredConnectionsKilledCount,nodeId=[nodeId],listenerType=[BROKER|CONTROLLER]
</td>
<td>

Expand Down