MINOR: tidy up SocketServerMemoryPoolTest#21873
Conversation
71d21c7 to
7f38206
Compare
|
A label of 'needs-attention' was automatically added to this PR in order to raise the |
| memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) | ||
| private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE | ||
| // accessible for testing | ||
| val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE |
There was a problem hiding this comment.
You said that the SocketServer metrics collide when using ClusterTest.
It seems like it would also be the case if you start a single server in combined mode, right?
There was a problem hiding this comment.
yes, there are two SocketServers instances, one for BrokerServer one fo ControllerServer, and each of them defines
metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
so the singleton Yammerregsitry only contains one gauge for that name,
so it cannot be used to read the broker's pool availableMemory - so i went for accessing the value directly in the test, not via the metric
There was a problem hiding this comment.
My point is that it's a bug. If you run a server in combined mode, you have 2 SocketServer instances each with its own MemoryPool but there's a single metric. Since newGauge() is a "get or create", I'm guessing the metric reports the value for the component that create its metric first. If so we should try to fix it (not in this PR).
There was a problem hiding this comment.
agreed @mimaison - I opened https://issues.apache.org/jira/browse/KAFKA-20453
as for the ClusterTest, even in normal KRaft you get multiple SocketServers instances in a single JVM and a single metric.
Would you consider approving this minor patch and leave KAFKA-20453 to another PR?
Although i am not sure for how long the scala SocketServer and KafkaYammerMetrics singleton will be around!
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the PR. It's a nice cleanup, I left a few small comments
| // Header: api_key(2) + api_version(2) + correlation_id(4) + client_id_len(2) + client_id | ||
| int headerSize = 2 + 2 + 4 + 2 + clientIdBytes.length; | ||
| int payloadSize = headerSize + body.length; | ||
| private int getBrokerBoundPort(ClusterInstance clusterInstance) { |
There was a problem hiding this comment.
If there's a good reason why we can't use brokerBoundPorts(), we have the boundPort() method on BrokerServer to retrieve the port. That method uses socketServer itself so we don't have to.
| long initialMemoryPoolAvailable = getMemoryPoolAvailable(clusterInstance); | ||
|
|
||
| try (Socket socket = IntegrationTestUtils.connect(clusterInstance.brokerBoundPorts().get(0))) { | ||
| try (Socket socket = IntegrationTestUtils.connect(getBrokerBoundPort(clusterInstance))) { |
There was a problem hiding this comment.
Why can't we use clusterInstance.brokerBoundPorts().get(0) anymore?
| int headerSize = 2 + 2 + 4 + 2 + clientIdBytes.length; | ||
| int payloadSize = headerSize + body.length; | ||
| private int getBrokerBoundPort(ClusterInstance clusterInstance) { | ||
| return getSocketServer(clusterInstance).boundPort(ListenerName.normalised(TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME)); |
There was a problem hiding this comment.
Again if we can't use brokerBoundPorts(), we should use clusterInstance.clientListener() to retrieve the listener name.
|
thanks for your comments, I've addressed your suggestions |
9f23d39 to
e6e51b4
Compare
mimaison
left a comment
There was a problem hiding this comment.
Thanks for the updates, I think it's a nice cleanup. I left a small suggestion
| @@ -98,7 +94,8 @@ class SocketServer( | |||
| private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", JSocketServer.METRICS_GROUP) | |||
| private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", JSocketServer.METRICS_GROUP) | |||
| memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) | |||
| private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE | |||
| // accessible for testing | |||
| val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE | |||
There was a problem hiding this comment.
Nit: I think we could make this private[network] instead of fully exposing it.
There was a problem hiding this comment.
Done, thanks
TIL
private[network] in Scala 2 compiles to a public getter method in JVM bytecode. The private[network] restriction is enforced only by the Scala compiler — it will reject Scala code outside kafka.network that tries to call memoryPool(). But the underlying bytecode method has public visibility, so Java code can call
it freely, completely bypassing the Scala-level restriction.
This is a fundamental limitation of how Scala 2 maps its package-scoped visibility to the JVM: the JVM has no concept of "private to a specific named package," so the compiler can only enforce the restriction syntactically during Scala-to-Scala compilation. Any Java caller (or even reflection) sees a fully public method.
There was a problem hiding this comment.
Also TIL, I knew it worked but never considered why. Thanks
|
@mimaison after rebasing to trunk before pushing I encountered a thread leak failure https://issues.apache.org/jira/browse/KAFKA-20619 and PR |
|
@mimaison the CI builds fail with something unrelated to this PR probably i just need to rebase |
* moved to package org.apache.kafka.common * use IntegrationTestUtils and RequestUtils to create messages * made SocketServer MemoryPool accessible for testing
4dcc5e2 to
f2b75ee
Compare
following up comments in #21740
note that the MemoryPoolAvailable metrics is registered in the singleton
default Yammer registry and it is not scoped per listener because there
is one pool per SocketServer, not one per listener. So it is not usable
in this ClusterTest.
Reviewers: Mickael Maison mimaison@users.noreply.github.com, Mickael
Maison mickael.maison@gmail.com