Skip to content

Commit b9882a0

Browse files
authored
KAFKA-20302: Receive buffers allocated from MemoryPool may not be released if request is invalid (#21740)
If an exception is thrown within kafka.network.Processor#processCompletedReceives close the receive (return the buffer to the memory pool) if it has not been returned already. Buffer may have been returned when successfully creating the RequestChannel.Request if the api did not require DelayedAllocation
1 parent 94b0f3e commit b9882a0

2 files changed

Lines changed: 177 additions & 2 deletions

File tree

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.producer;
18+
19+
import kafka.network.SocketServer;
20+
import kafka.server.KafkaBroker;
21+
22+
import org.apache.kafka.common.memory.MemoryPool;
23+
import org.apache.kafka.common.protocol.ApiKeys;
24+
import org.apache.kafka.common.test.ClusterInstance;
25+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
26+
import org.apache.kafka.common.test.api.ClusterTest;
27+
import org.apache.kafka.common.test.api.ClusterTestDefaults;
28+
import org.apache.kafka.common.test.api.Type;
29+
import org.apache.kafka.network.SocketServerConfigs;
30+
import org.apache.kafka.server.IntegrationTestUtils;
31+
32+
import java.io.EOFException;
33+
import java.io.InputStream;
34+
import java.lang.reflect.Field;
35+
import java.net.Socket;
36+
import java.net.SocketTimeoutException;
37+
import java.nio.ByteBuffer;
38+
import java.nio.charset.StandardCharsets;
39+
40+
import static org.junit.jupiter.api.Assertions.assertEquals;
41+
import static org.junit.jupiter.api.Assertions.assertTrue;
42+
43+
/**
44+
* Verifies that the (bounded) memory pool releases the memory also after
45+
* invalid and unsupported protocol requests
46+
*/
47+
@ClusterTestDefaults(types = {Type.CO_KRAFT}, serverProperties = {
48+
@ClusterConfigProperty(key = SocketServerConfigs.SOCKET_REQUEST_MAX_BYTES_CONFIG, value = "50000"),
49+
@ClusterConfigProperty(key = SocketServerConfigs.QUEUED_MAX_BYTES_CONFIG, value = "100000"),
50+
})
51+
public class SocketServerMemoryPoolTest {
52+
@ClusterTest
53+
public void testProduceRequestWithUnsupportedVersion(ClusterInstance clusterInstance) throws Exception {
54+
short unsupportedVersion = Short.MAX_VALUE;
55+
byte[] rawRequestBytes = buildRawRequest(
56+
ApiKeys.PRODUCE.id,
57+
unsupportedVersion,
58+
/* correlationId */ 1,
59+
/* clientId */ "test-unsupported-version",
60+
new byte[10000]
61+
);
62+
63+
sendAndAssert(clusterInstance, rawRequestBytes);
64+
}
65+
66+
@ClusterTest
67+
public void testProduceRequestWithCorruptBody(ClusterInstance clusterInstance) throws Exception {
68+
short validVersion = 3;
69+
byte[] corruptBody = new byte[10000];
70+
for (int i = 0; i < corruptBody.length; i++) {
71+
corruptBody[i] = (byte) 0xFF; // The corrupt body (0xFF ... 0xFF) makes Schema.read() throw SchemaException.
72+
}
73+
74+
byte[] rawRequestBytes = buildRawRequest(
75+
ApiKeys.PRODUCE.id,
76+
validVersion,
77+
/* correlationId */ 2,
78+
/* clientId */ "test-corrupt-body",
79+
corruptBody
80+
);
81+
82+
sendAndAssert(clusterInstance, rawRequestBytes);
83+
}
84+
85+
private void sendAndAssert(ClusterInstance clusterInstance, byte[] rawRequestBytes) throws Exception {
86+
long initialMemoryPoolAvailable = getMemoryPoolAvailable(clusterInstance);
87+
88+
try (Socket socket = IntegrationTestUtils.connect(clusterInstance.brokerBoundPorts().get(0))) {
89+
socket.setSoTimeout(/* readTimeoutMs */ 5_000);
90+
IntegrationTestUtils.sendRequest(socket, rawRequestBytes);
91+
assertTrue(drainUntilClosed(socket.getInputStream()), "expected connection closed");
92+
}
93+
94+
long finalMemoryPoolAvailable = getMemoryPoolAvailable(clusterInstance);
95+
assertEquals(100000, initialMemoryPoolAvailable);
96+
assertEquals(initialMemoryPoolAvailable, finalMemoryPoolAvailable);
97+
}
98+
99+
// This test uses reflection to read the SocketServer memoryPool availableMemory.
100+
// The metric "MemoryPoolAvailable" from Yammer Metrics default registry
101+
// can be overwritten in a @ClusterTest as the registry is a singleton.
102+
long getMemoryPoolAvailable(ClusterInstance clusterInstance) throws Exception {
103+
KafkaBroker broker = clusterInstance.aliveBrokers().values().iterator().next();
104+
SocketServer socketServer = broker.socketServer();
105+
Field memoryPoolField = socketServer.getClass().getDeclaredField("memoryPool");
106+
memoryPoolField.setAccessible(true);
107+
MemoryPool memoryPool = (MemoryPool) memoryPoolField.get(socketServer);
108+
return memoryPool.availableMemory();
109+
}
110+
111+
/**
112+
* Builds a raw Kafka request excluding the frame length
113+
*
114+
* <p>Wire layout:
115+
* <pre>
116+
* 4 bytes – frame length (payload size, not including these 4 bytes)
117+
*
118+
* 2 bytes – api_key
119+
* 2 bytes – api_version
120+
* 4 bytes – correlation_id
121+
* 2 bytes – client_id string length
122+
* N bytes – client_id (UTF-8)
123+
* X bytes - request body
124+
* </pre>
125+
*/
126+
private static byte[] buildRawRequest(short apiKey, short apiVersion, int correlationId, String clientId, byte[] body) {
127+
byte[] clientIdBytes = clientId.getBytes(StandardCharsets.UTF_8);
128+
129+
// Header: api_key(2) + api_version(2) + correlation_id(4) + client_id_len(2) + client_id
130+
int headerSize = 2 + 2 + 4 + 2 + clientIdBytes.length;
131+
int payloadSize = headerSize + body.length;
132+
133+
ByteBuffer buf = ByteBuffer.allocate(payloadSize);
134+
buf.putShort(apiKey); // api_key
135+
buf.putShort(apiVersion); // api_version
136+
buf.putInt(correlationId); // correlation_id
137+
buf.putShort((short) clientIdBytes.length); // client_id string length
138+
buf.put(clientIdBytes); // client_id bytes
139+
buf.put(body); // request body (possibly empty / corrupt)
140+
return buf.array();
141+
}
142+
143+
/*
144+
* Reads and discards bytes until the stream ends or times out.
145+
*
146+
* @return true if the remote end closed the connection (EOF or connection-reset),
147+
* false if the socket timeout expired before closure.
148+
*/
149+
private static boolean drainUntilClosed(InputStream in) {
150+
try {
151+
while (true) {
152+
if (in.read() == -1) {
153+
// Clean EOF – broker closed its side of the connection.
154+
return true;
155+
}
156+
// Some broker versions send a partial error response before closing; keep draining.
157+
}
158+
} catch (EOFException e) {
159+
return true;
160+
} catch (SocketTimeoutException e) {
161+
// SO_TIMEOUT fired before EOF – broker did not close within the allotted time.
162+
return false;
163+
} catch (Exception e) {
164+
// Any other I/O error (e.g., "Connection reset by peer") means the broker
165+
// unilaterally terminated the connection, which is the expected outcome.
166+
return true;
167+
}
168+
}
169+
}

core/src/main/scala/kafka/network/SocketServer.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,10 +1001,12 @@ private[kafka] class Processor(
10011001

10021002
private def processCompletedReceives(): Unit = {
10031003
selector.completedReceives.forEach { receive =>
1004+
var header: RequestHeader = null
1005+
var req: RequestChannel.Request = null
10041006
try {
10051007
openOrClosingChannel(receive.source) match {
10061008
case Some(channel) =>
1007-
val header = parseRequestHeader(apiVersionManager, receive.payload)
1009+
header = parseRequestHeader(apiVersionManager, receive.payload)
10081010
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,
10091011
() => time.nanoseconds()))
10101012
trace(s"Begin re-authentication: $channel")
@@ -1014,14 +1016,15 @@ private[kafka] class Processor(
10141016
// be sure to decrease connection count and drop any in-flight responses
10151017
debug(s"Disconnecting expired channel: $channel : $header")
10161018
close(channel.id)
1019+
receive.close() // return buffer to memory pool
10171020
expiredConnectionsKilledCount.record(null, 1, 0)
10181021
} else {
10191022
val connectionId = receive.source
10201023
val context = new RequestContext(header, connectionId, channel.socketAddress, Optional.of(channel.socketPort()),
10211024
channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation,
10221025
isPrivilegedListener, channel.principalSerde)
10231026

1024-
val req = new RequestChannel.Request(processor = id, context = context,
1027+
req = new RequestChannel.Request(processor = id, context = context,
10251028
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
10261029

10271030
// KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
@@ -1047,6 +1050,9 @@ private[kafka] class Processor(
10471050
// note that even though we got an exception, we can assume that receive.source is valid.
10481051
// Issues with constructing a valid receive object were handled earlier
10491052
case e: Throwable =>
1053+
if (header == null || req == null) {
1054+
receive.close() // return buffer to memory pool
1055+
}
10501056
processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
10511057
}
10521058
}

0 commit comments

Comments
 (0)