1616 */
1717package org .apache .kafka .common ;
1818
19- import kafka .network .SocketServer ;
20-
2119import org .apache .kafka .common .message .ProduceRequestData ;
22- import org .apache .kafka .common .network .ListenerName ;
2320import org .apache .kafka .common .protocol .ApiKeys ;
2421import org .apache .kafka .common .requests .RequestHeader ;
2522import org .apache .kafka .common .requests .RequestUtils ;
2623import org .apache .kafka .common .test .ClusterInstance ;
2724import org .apache .kafka .common .test .api .ClusterConfigProperty ;
2825import org .apache .kafka .common .test .api .ClusterTest ;
2926import org .apache .kafka .common .test .api .ClusterTestDefaults ;
30- import org .apache .kafka .common .test .api .TestKitDefaults ;
3127import org .apache .kafka .common .test .api .Type ;
3228import org .apache .kafka .network .SocketServerConfigs ;
3329import org .apache .kafka .server .IntegrationTestUtils ;
3834import java .net .SocketTimeoutException ;
3935import java .nio .ByteBuffer ;
4036
37+ import static org .apache .kafka .common .test .api .TestKitDefaults .BROKER_ID_OFFSET ;
4138import static org .junit .jupiter .api .Assertions .assertEquals ;
4239import static org .junit .jupiter .api .Assertions .assertTrue ;
4340
@@ -66,6 +63,7 @@ public void testRequestWithCorruptBody(ClusterInstance clusterInstance) throws E
6663 byte [] rawRequestBytes = buffer .array ();
6764
6865 // corrupt body but leave header valid
66+ assertTrue (rawRequestBytes .length > header .size ());
6967 for (int i = header .size (); i < rawRequestBytes .length ; i ++) {
7068 rawRequestBytes [i ] = (byte ) 0xFF ;
7169 }
@@ -75,7 +73,7 @@ public void testRequestWithCorruptBody(ClusterInstance clusterInstance) throws E
7573 private void sendAndAssert (ClusterInstance clusterInstance , byte [] rawRequestBytes ) throws Exception {
7674 long initialMemoryPoolAvailable = getMemoryPoolAvailable (clusterInstance );
7775
78- try (Socket socket = IntegrationTestUtils .connect (getBrokerBoundPort ( clusterInstance ))) {
76+ try (Socket socket = IntegrationTestUtils .connect (clusterInstance . brokerBoundPorts (). get ( BROKER_ID_OFFSET ))) {
7977 socket .setSoTimeout (/* readTimeoutMs */ 5_000 );
8078 IntegrationTestUtils .sendRequest (socket , rawRequestBytes );
8179 assertTrue (drainUntilClosed (socket .getInputStream ()), "expected connection closed" );
@@ -86,16 +84,8 @@ private void sendAndAssert(ClusterInstance clusterInstance, byte[] rawRequestByt
8684 assertEquals (initialMemoryPoolAvailable , finalMemoryPoolAvailable );
8785 }
8886
89- private SocketServer getSocketServer (ClusterInstance clusterInstance ) {
90- return clusterInstance .brokers ().get (TestKitDefaults .BROKER_ID_OFFSET ).socketServer ();
91- }
92-
93- private int getBrokerBoundPort (ClusterInstance clusterInstance ) {
94- return getSocketServer (clusterInstance ).boundPort (ListenerName .normalised (TestKitDefaults .DEFAULT_BROKER_LISTENER_NAME ));
95- }
96-
9787 private long getMemoryPoolAvailable (ClusterInstance clusterInstance ) {
98- return getSocketServer ( clusterInstance ).memoryPool ().availableMemory ();
88+ return clusterInstance . brokers (). get ( BROKER_ID_OFFSET ). socketServer ( ).memoryPool ().availableMemory ();
9989 }
10090
10191 /*
0 commit comments