1414 * See the License for the specific language governing permissions and
1515 * limitations under the License.
1616 */
17- package org .apache .kafka .clients . producer ;
17+ package org .apache .kafka .common ;
1818
19- import kafka .network .SocketServer ;
20- import kafka .server .KafkaBroker ;
21-
22- import org .apache .kafka .common .memory .MemoryPool ;
19+ import org .apache .kafka .common .message .ProduceRequestData ;
2320import org .apache .kafka .common .protocol .ApiKeys ;
21+ import org .apache .kafka .common .requests .RequestHeader ;
22+ import org .apache .kafka .common .requests .RequestUtils ;
2423import org .apache .kafka .common .test .ClusterInstance ;
2524import org .apache .kafka .common .test .api .ClusterConfigProperty ;
2625import org .apache .kafka .common .test .api .ClusterTest ;
2726import org .apache .kafka .common .test .api .ClusterTestDefaults ;
27+ import org .apache .kafka .common .test .api .TestKitDefaults ;
2828import org .apache .kafka .common .test .api .Type ;
2929import org .apache .kafka .network .SocketServerConfigs ;
3030import org .apache .kafka .server .IntegrationTestUtils ;
3131
3232import java .io .EOFException ;
3333import java .io .InputStream ;
34- import java .lang .reflect .Field ;
3534import java .net .Socket ;
3635import java .net .SocketTimeoutException ;
3736import java .nio .ByteBuffer ;
38- import java .nio .charset .StandardCharsets ;
3937
4038import static org .junit .jupiter .api .Assertions .assertEquals ;
4139import static org .junit .jupiter .api .Assertions .assertTrue ;
4947 @ ClusterConfigProperty (key = SocketServerConfigs .QUEUED_MAX_BYTES_CONFIG , value = "100000" ),
5048})
5149public class SocketServerMemoryPoolTest {
50+
5251 @ ClusterTest
53- public void testProduceRequestWithUnsupportedVersion (ClusterInstance clusterInstance ) throws Exception {
54- short unsupportedVersion = Short .MAX_VALUE ;
55- byte [] rawRequestBytes = buildRawRequest (
56- ApiKeys .PRODUCE .id ,
57- unsupportedVersion ,
58- /* correlationId */ 1 ,
59- /* clientId */ "test-unsupported-version" ,
60- new byte [10000 ]
61- );
52+ public void testRequestWithUnsupportedVersion (ClusterInstance clusterInstance ) throws Exception {
53+ RequestHeader header = IntegrationTestUtils .nextRequestHeader (ApiKeys .PRODUCE , Short .MAX_VALUE );
54+ ByteBuffer bytes = RequestUtils .serialize (header .data (), header .headerVersion (), new ProduceRequestData (), header .apiVersion ());
55+ byte [] rawRequestBytes = bytes .array ();
6256
6357 sendAndAssert (clusterInstance , rawRequestBytes );
6458 }
6559
6660 @ ClusterTest
67- public void testProduceRequestWithCorruptBody (ClusterInstance clusterInstance ) throws Exception {
68- short validVersion = 3 ;
69- byte [] corruptBody = new byte [10000 ];
70- for (int i = 0 ; i < corruptBody .length ; i ++) {
71- corruptBody [i ] = (byte ) 0xFF ; // The corrupt body (0xFF ... 0xFF) makes Schema.read() throw SchemaException.
61+ public void testRequestWithCorruptBody (ClusterInstance clusterInstance ) throws Exception {
62+ RequestHeader header = IntegrationTestUtils .nextRequestHeader (ApiKeys .PRODUCE , ApiKeys .PRODUCE .latestVersion ());
63+ ByteBuffer bytes = RequestUtils .serialize (header .data (), header .headerVersion (), new ProduceRequestData (), header .apiVersion ());
64+ byte [] rawRequestBytes = bytes .array ();
65+
66+ // corrupt body but leave header valid
67+ for (int i = header .size (); i < rawRequestBytes .length ; i ++) {
68+ rawRequestBytes [i ] = (byte ) 0xFF ;
7269 }
73-
74- byte [] rawRequestBytes = buildRawRequest (
75- ApiKeys .PRODUCE .id ,
76- validVersion ,
77- /* correlationId */ 2 ,
78- /* clientId */ "test-corrupt-body" ,
79- corruptBody
80- );
81-
8270 sendAndAssert (clusterInstance , rawRequestBytes );
8371 }
8472
@@ -96,48 +84,8 @@ private void sendAndAssert(ClusterInstance clusterInstance, byte[] rawRequestByt
9684 assertEquals (initialMemoryPoolAvailable , finalMemoryPoolAvailable );
9785 }
9886
99- // This test uses reflection to read the SocketServer memoryPool availableMemory.
100- // The metric "MemoryPoolAvailable" from Yammer Metrics default registry
101- // can be overwritten in a @ClusterTest as the registry is a singleton.
102- long getMemoryPoolAvailable (ClusterInstance clusterInstance ) throws Exception {
103- KafkaBroker broker = clusterInstance .aliveBrokers ().values ().iterator ().next ();
104- SocketServer socketServer = broker .socketServer ();
105- Field memoryPoolField = socketServer .getClass ().getDeclaredField ("memoryPool" );
106- memoryPoolField .setAccessible (true );
107- MemoryPool memoryPool = (MemoryPool ) memoryPoolField .get (socketServer );
108- return memoryPool .availableMemory ();
109- }
110-
111- /**
112- * Builds a raw Kafka request excluding the frame length
113- *
114- * <p>Wire layout:
115- * <pre>
116- * 4 bytes – frame length (payload size, not including these 4 bytes)
117- *
118- * 2 bytes – api_key
119- * 2 bytes – api_version
120- * 4 bytes – correlation_id
121- * 2 bytes – client_id string length
122- * N bytes – client_id (UTF-8)
123- * X bytes - request body
124- * </pre>
125- */
126- private static byte [] buildRawRequest (short apiKey , short apiVersion , int correlationId , String clientId , byte [] body ) {
127- byte [] clientIdBytes = clientId .getBytes (StandardCharsets .UTF_8 );
128-
129- // Header: api_key(2) + api_version(2) + correlation_id(4) + client_id_len(2) + client_id
130- int headerSize = 2 + 2 + 4 + 2 + clientIdBytes .length ;
131- int payloadSize = headerSize + body .length ;
132-
133- ByteBuffer buf = ByteBuffer .allocate (payloadSize );
134- buf .putShort (apiKey ); // api_key
135- buf .putShort (apiVersion ); // api_version
136- buf .putInt (correlationId ); // correlation_id
137- buf .putShort ((short ) clientIdBytes .length ); // client_id string length
138- buf .put (clientIdBytes ); // client_id bytes
139- buf .put (body ); // request body (possibly empty / corrupt)
140- return buf .array ();
87+ private long getMemoryPoolAvailable (ClusterInstance clusterInstance ) throws Exception {
88+ return clusterInstance .brokers ().get (TestKitDefaults .BROKER_ID_OFFSET ).socketServer ().memoryPool ().availableMemory ();
14189 }
14290
14391 /*
0 commit comments