1616 */
1717package org .apache .kafka .common ;
1818
19- import kafka .network .SocketServer ;
20-
2119import org .apache .kafka .common .message .ProduceRequestData ;
22- import org .apache .kafka .common .network .ListenerName ;
2320import org .apache .kafka .common .protocol .ApiKeys ;
2421import org .apache .kafka .common .requests .RequestHeader ;
2522import org .apache .kafka .common .requests .RequestUtils ;
@@ -66,6 +63,7 @@ public void testRequestWithCorruptBody(ClusterInstance clusterInstance) throws E
6663 byte [] rawRequestBytes = buffer .array ();
6764
6865 // corrupt body but leave header valid
66+ assertTrue (rawRequestBytes .length > header .size (), "must have body bytes to corrupt" );
6967 for (int i = header .size (); i < rawRequestBytes .length ; i ++) {
7068 rawRequestBytes [i ] = (byte ) 0xFF ;
7169 }
@@ -75,7 +73,7 @@ public void testRequestWithCorruptBody(ClusterInstance clusterInstance) throws E
7573 private void sendAndAssert (ClusterInstance clusterInstance , byte [] rawRequestBytes ) throws Exception {
7674 long initialMemoryPoolAvailable = getMemoryPoolAvailable (clusterInstance );
7775
78- try (Socket socket = IntegrationTestUtils .connect (getBrokerBoundPort ( clusterInstance ))) {
76+ try (Socket socket = IntegrationTestUtils .connect (clusterInstance . brokerBoundPorts (). get ( 0 ))) {
7977 socket .setSoTimeout (/* readTimeoutMs */ 5_000 );
8078 IntegrationTestUtils .sendRequest (socket , rawRequestBytes );
8179 assertTrue (drainUntilClosed (socket .getInputStream ()), "expected connection closed" );
@@ -86,16 +84,8 @@ private void sendAndAssert(ClusterInstance clusterInstance, byte[] rawRequestByt
8684 assertEquals (initialMemoryPoolAvailable , finalMemoryPoolAvailable );
8785 }
8886
89- private SocketServer getSocketServer (ClusterInstance clusterInstance ) {
90- return clusterInstance .brokers ().get (TestKitDefaults .BROKER_ID_OFFSET ).socketServer ();
91- }
92-
93- private int getBrokerBoundPort (ClusterInstance clusterInstance ) {
94- return getSocketServer (clusterInstance ).boundPort (ListenerName .normalised (TestKitDefaults .DEFAULT_BROKER_LISTENER_NAME ));
95- }
96-
9787 private long getMemoryPoolAvailable (ClusterInstance clusterInstance ) {
98- return getSocketServer ( clusterInstance ).memoryPool ().availableMemory ();
88+ return clusterInstance . brokers (). get ( TestKitDefaults . BROKER_ID_OFFSET ). socketServer ( ).memoryPool ().availableMemory ();
9989 }
10090
10191 /*
0 commit comments