Skip to content

Commit df01205

Browse files
committed
helper methods
1 parent 7d1f6bd commit df01205

1 file changed

Lines changed: 12 additions & 2 deletions

File tree

clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
*/
1717
package org.apache.kafka.common;
1818

19+
import kafka.network.SocketServer;
1920
import org.apache.kafka.common.message.ProduceRequestData;
21+
import org.apache.kafka.common.network.ListenerName;
2022
import org.apache.kafka.common.protocol.ApiKeys;
2123
import org.apache.kafka.common.requests.RequestHeader;
2224
import org.apache.kafka.common.requests.RequestUtils;
@@ -72,7 +74,7 @@ public void testRequestWithCorruptBody(ClusterInstance clusterInstance) throws E
7274
private void sendAndAssert(ClusterInstance clusterInstance, byte[] rawRequestBytes) throws Exception {
7375
long initialMemoryPoolAvailable = getMemoryPoolAvailable(clusterInstance);
7476

75-
try (Socket socket = IntegrationTestUtils.connect(clusterInstance.brokerBoundPorts().get(0))) {
77+
try (Socket socket = IntegrationTestUtils.connect(getBrokerBoundPort(clusterInstance))) {
7678
socket.setSoTimeout(/* readTimeoutMs */ 5_000);
7779
IntegrationTestUtils.sendRequest(socket, rawRequestBytes);
7880
assertTrue(drainUntilClosed(socket.getInputStream()), "expected connection closed");
@@ -83,8 +85,16 @@ private void sendAndAssert(ClusterInstance clusterInstance, byte[] rawRequestByt
8385
assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable);
8486
}
8587

88+
private SocketServer getSocketServer(ClusterInstance clusterInstance) {
89+
return clusterInstance.brokers().get(TestKitDefaults.BROKER_ID_OFFSET).socketServer();
90+
}
91+
92+
private int getBrokerBoundPort(ClusterInstance clusterInstance) {
93+
return getSocketServer(clusterInstance).boundPort(ListenerName.normalised(TestKitDefaults.DEFAULT_BROKER_LISTENER_NAME));
94+
}
95+
8696
private long getMemoryPoolAvailable(ClusterInstance clusterInstance) {
87-
return clusterInstance.brokers().get(TestKitDefaults.BROKER_ID_OFFSET).socketServer().memoryPool().availableMemory();
97+
return getSocketServer(clusterInstance).memoryPool().availableMemory();
8898
}
8999

90100
/*

0 commit comments

Comments
 (0)