Skip to content

Commit 5c93ec9

Browse files
authored
MINOR: tidy up SocketServerMemoryPoolTest (#21873)
* moved to package org.apache.kafka.common * use IntegrationTestUtils and RequestUtils to create messages * made SocketServer MemoryPool accessible to Java unit test Reviewers: Mickael Maison <mickael.maison@gmail.com>
1 parent 2ba03b2 commit 5c93ec9

2 files changed

Lines changed: 21 additions & 73 deletions

File tree

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/SocketServerMemoryPoolTest.java renamed to clients/clients-integration-tests/src/test/java/org/apache/kafka/common/SocketServerMemoryPoolTest.java

Lines changed: 20 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,26 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package org.apache.kafka.clients.producer;
17+
package org.apache.kafka.common;
1818

19-
import kafka.network.SocketServer;
20-
import kafka.server.KafkaBroker;
21-
22-
import org.apache.kafka.common.memory.MemoryPool;
19+
import org.apache.kafka.common.message.ProduceRequestData;
2320
import org.apache.kafka.common.protocol.ApiKeys;
21+
import org.apache.kafka.common.requests.RequestHeader;
22+
import org.apache.kafka.common.requests.RequestUtils;
2423
import org.apache.kafka.common.test.ClusterInstance;
2524
import org.apache.kafka.common.test.api.ClusterConfigProperty;
2625
import org.apache.kafka.common.test.api.ClusterTest;
2726
import org.apache.kafka.common.test.api.ClusterTestDefaults;
27+
import org.apache.kafka.common.test.api.TestKitDefaults;
2828
import org.apache.kafka.common.test.api.Type;
2929
import org.apache.kafka.network.SocketServerConfigs;
3030
import org.apache.kafka.server.IntegrationTestUtils;
3131

3232
import java.io.EOFException;
3333
import java.io.InputStream;
34-
import java.lang.reflect.Field;
3534
import java.net.Socket;
3635
import java.net.SocketTimeoutException;
3736
import java.nio.ByteBuffer;
38-
import java.nio.charset.StandardCharsets;
3937

4038
import static org.junit.jupiter.api.Assertions.assertEquals;
4139
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -50,35 +48,25 @@
5048
})
5149
public class SocketServerMemoryPoolTest {
5250
@ClusterTest
53-
public void testProduceRequestWithUnsupportedVersion(ClusterInstance clusterInstance) throws Exception {
54-
short unsupportedVersion = Short.MAX_VALUE;
55-
byte[] rawRequestBytes = buildRawRequest(
56-
ApiKeys.PRODUCE.id,
57-
unsupportedVersion,
58-
/* correlationId */ 1,
59-
/* clientId */ "test-unsupported-version",
60-
new byte[10000]
61-
);
51+
public void testRequestWithUnsupportedVersion(ClusterInstance clusterInstance) throws Exception {
52+
RequestHeader header = IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, Short.MAX_VALUE);
53+
ByteBuffer buffer = RequestUtils.serialize(header.data(), header.headerVersion(), new ProduceRequestData(), header.apiVersion());
54+
byte[] rawRequestBytes = buffer.array();
6255

6356
sendAndAssert(clusterInstance, rawRequestBytes);
6457
}
6558

6659
@ClusterTest
67-
public void testProduceRequestWithCorruptBody(ClusterInstance clusterInstance) throws Exception {
68-
short validVersion = 3;
69-
byte[] corruptBody = new byte[10000];
70-
for (int i = 0; i < corruptBody.length; i++) {
71-
corruptBody[i] = (byte) 0xFF; // The corrupt body (0xFF ... 0xFF) makes Schema.read() throw SchemaException.
60+
public void testRequestWithCorruptBody(ClusterInstance clusterInstance) throws Exception {
61+
RequestHeader header = IntegrationTestUtils.nextRequestHeader(ApiKeys.PRODUCE, ApiKeys.PRODUCE.latestVersion());
62+
ByteBuffer buffer = RequestUtils.serialize(header.data(), header.headerVersion(), new ProduceRequestData(), header.apiVersion());
63+
byte[] rawRequestBytes = buffer.array();
64+
65+
// corrupt body but leave header valid
66+
assertTrue(rawRequestBytes.length > header.size(), "must have body bytes to corrupt");
67+
for (int i = header.size(); i < rawRequestBytes.length; i++) {
68+
rawRequestBytes[i] = (byte) 0xFF;
7269
}
73-
74-
byte[] rawRequestBytes = buildRawRequest(
75-
ApiKeys.PRODUCE.id,
76-
validVersion,
77-
/* correlationId */ 2,
78-
/* clientId */ "test-corrupt-body",
79-
corruptBody
80-
);
81-
8270
sendAndAssert(clusterInstance, rawRequestBytes);
8371
}
8472

@@ -96,48 +84,8 @@ private void sendAndAssert(ClusterInstance clusterInstance, byte[] rawRequestByt
9684
assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable);
9785
}
9886

99-
// This test uses reflection to read the SocketServer memoryPool availableMemory.
100-
// The metric "MemoryPoolAvailable" from Yammer Metrics default registry
101-
// can be overwritten in a @ClusterTest as the registry is a singleton.
102-
long getMemoryPoolAvailable(ClusterInstance clusterInstance) throws Exception {
103-
KafkaBroker broker = clusterInstance.aliveBrokers().values().iterator().next();
104-
SocketServer socketServer = broker.socketServer();
105-
Field memoryPoolField = socketServer.getClass().getDeclaredField("memoryPool");
106-
memoryPoolField.setAccessible(true);
107-
MemoryPool memoryPool = (MemoryPool) memoryPoolField.get(socketServer);
108-
return memoryPool.availableMemory();
109-
}
110-
111-
/**
112-
* Builds a raw Kafka request excluding the frame length
113-
*
114-
* <p>Wire layout:
115-
* <pre>
116-
* 4 bytes – frame length (payload size, not including these 4 bytes)
117-
*
118-
* 2 bytes – api_key
119-
* 2 bytes – api_version
120-
* 4 bytes – correlation_id
121-
* 2 bytes – client_id string length
122-
* N bytes – client_id (UTF-8)
123-
* X bytes - request body
124-
* </pre>
125-
*/
126-
private static byte[] buildRawRequest(short apiKey, short apiVersion, int correlationId, String clientId, byte[] body) {
127-
byte[] clientIdBytes = clientId.getBytes(StandardCharsets.UTF_8);
128-
129-
// Header: api_key(2) + api_version(2) + correlation_id(4) + client_id_len(2) + client_id
130-
int headerSize = 2 + 2 + 4 + 2 + clientIdBytes.length;
131-
int payloadSize = headerSize + body.length;
132-
133-
ByteBuffer buf = ByteBuffer.allocate(payloadSize);
134-
buf.putShort(apiKey); // api_key
135-
buf.putShort(apiVersion); // api_version
136-
buf.putInt(correlationId); // correlation_id
137-
buf.putShort((short) clientIdBytes.length); // client_id string length
138-
buf.put(clientIdBytes); // client_id bytes
139-
buf.put(body); // request body (possibly empty / corrupt)
140-
return buf.array();
87+
private long getMemoryPoolAvailable(ClusterInstance clusterInstance) {
88+
return clusterInstance.brokers().get(TestKitDefaults.BROKER_ID_OFFSET).socketServer().memoryPool().availableMemory();
14189
}
14290

14391
/*

core/src/main/scala/kafka/network/SocketServer.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class SocketServer(
9898
private val memoryPoolDepletedPercentMetricName = metrics.metricName("MemoryPoolAvgDepletedPercent", JSocketServer.METRICS_GROUP)
9999
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", JSocketServer.METRICS_GROUP)
100100
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
101-
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
101+
private[network] val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
102102
// data-plane
103103
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[Endpoint, DataPlaneAcceptor]()
104104
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, time, apiVersionManager.newRequestMetrics)

0 commit comments

Comments
 (0)