Skip to content

Commit a719ec2

Browse files
committed
KAFKA-17601: Inter-broker connections do not expose their clientSoftwareName and clientSoftwareVersion tags
Signed-off-by: PoAn Yang <[email protected]>
1 parent 4e3a3d3 commit a719ec2

File tree

5 files changed

+25
-16
lines changed

5 files changed

+25
-16
lines changed

Diff for: clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java

+7-3
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,11 @@ public RequestContext(RequestHeader header,
109109
}
110110

111111
public RequestAndSize parseRequest(ByteBuffer buffer) {
112-
if (isUnsupportedApiVersionsRequest()) {
112+
return staticParseRequest(header, buffer, connectionId, listenerName, principal);
113+
}
114+
115+
public static RequestAndSize staticParseRequest(RequestHeader header, ByteBuffer buffer, String connectionId, ListenerName listenerName, KafkaPrincipal principal) {
116+
if (isUnsupportedApiVersionsRequest(header)) {
113117
// Unsupported ApiVersion requests are treated as v0 requests and are not parsed
114118
ApiVersionsRequest apiVersionsRequest = new ApiVersionsRequest(new ApiVersionsRequestData(), (short) 0, header.apiVersion());
115119
return new RequestAndSize(apiVersionsRequest, 0);
@@ -150,13 +154,13 @@ public ByteBuffer buildResponseEnvelopePayload(AbstractResponse body) {
150154
return body.serializeWithHeader(header.toResponseHeader(), apiVersion());
151155
}
152156

153-
private boolean isUnsupportedApiVersionsRequest() {
157+
private static boolean isUnsupportedApiVersionsRequest(RequestHeader header) {
154158
return header.apiKey() == API_VERSIONS && !API_VERSIONS.isVersionSupported(header.apiVersion());
155159
}
156160

157161
public short apiVersion() {
158162
// Use v0 when serializing an unhandled ApiVersion response
159-
if (isUnsupportedApiVersionsRequest())
163+
if (isUnsupportedApiVersionsRequest(header))
160164
return 0;
161165
return header.apiVersion();
162166
}

Diff for: core/src/main/scala/kafka/network/RequestChannel.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ object RequestChannel extends Logging {
6363
val memoryPool: MemoryPool,
6464
@volatile var buffer: ByteBuffer,
6565
metrics: RequestChannelMetrics,
66-
val envelope: Option[RequestChannel.Request] = None) extends BaseRequest {
66+
val envelope: Option[RequestChannel.Request] = None,
67+
val requestAndSize: Option[RequestAndSize] = None) extends BaseRequest {
6768
// These need to be volatile because the readers are in the network thread and the writers are in the request
6869
// handler threads or the purgatory threads
6970
@volatile var requestDequeueTimeNanos: Long = -1L
@@ -79,7 +80,7 @@ object RequestChannel extends Logging {
7980

8081
val session: Session = new Session(context.principal, context.clientAddress)
8182

82-
private val bodyAndSize: RequestAndSize = context.parseRequest(buffer)
83+
private val bodyAndSize: RequestAndSize = requestAndSize.getOrElse(context.parseRequest(buffer))
8384

8485
// This is constructed on creation of a Request so that the JSON representation is computed before the request is
8586
// processed by the api layer. Otherwise, a ProduceRequest can occur without its data (ie. it goes into purgatory).

Diff for: core/src/main/scala/kafka/network/SocketServer.scala

+13-9
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.kafka.common.metrics.stats.{Avg, CumulativeSum, Meter, Rate}
4040
import org.apache.kafka.common.network.KafkaChannel.ChannelMuteEvent
4141
import org.apache.kafka.common.network.{ChannelBuilder, ChannelBuilders, ClientInformation, KafkaChannel, ListenerName, ListenerReconfigurable, NetworkSend, Selectable, Send, ServerConnectionId, Selector => KSelector}
4242
import org.apache.kafka.common.protocol.ApiKeys
43-
import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestContext, RequestHeader}
43+
import org.apache.kafka.common.requests.{ApiVersionsRequest, RequestAndSize, RequestContext, RequestHeader}
4444
import org.apache.kafka.common.security.auth.SecurityProtocol
4545
import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
4646
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
@@ -1133,23 +1133,27 @@ private[kafka] class Processor(
11331133
expiredConnectionsKilledCount.record(null, 1, 0)
11341134
} else {
11351135
val connectionId = receive.source
1136-
val context = new RequestContext(header, connectionId, channel.socketAddress, Optional.of(channel.socketPort()),
1137-
channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation,
1138-
isPrivilegedListener, channel.principalSerde)
1139-
1140-
val req = new RequestChannel.Request(processor = id, context = context,
1141-
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None)
1142-
1136+
var requestAndSize: Option[RequestAndSize] = None
11431137
// KIP-511: ApiVersionsRequest is intercepted here to catch the client software name
11441138
// and version. It is done here to avoid wiring things up to the api layer.
11451139
if (header.apiKey == ApiKeys.API_VERSIONS) {
1146-
val apiVersionsRequest = req.body[ApiVersionsRequest]
1140+
val result = RequestContext.staticParseRequest(header, receive.payload, connectionId, listenerName, channel.principal)
1141+
val apiVersionsRequest = result.request.asInstanceOf[ApiVersionsRequest]
11471142
if (apiVersionsRequest.isValid) {
11481143
channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
11491144
apiVersionsRequest.data.clientSoftwareName,
11501145
apiVersionsRequest.data.clientSoftwareVersion))
11511146
}
1147+
requestAndSize = Some(result)
11521148
}
1149+
1150+
val context = new RequestContext(header, connectionId, channel.socketAddress, Optional.of(channel.socketPort()),
1151+
channel.principal, listenerName, securityProtocol, channel.channelMetadataRegistry.clientInformation,
1152+
isPrivilegedListener, channel.principalSerde)
1153+
1154+
val req = new RequestChannel.Request(processor = id, context = context,
1155+
startTimeNanos = nowNanos, memoryPool, receive.payload, requestChannel.metrics, None, requestAndSize)
1156+
11531157
requestChannel.sendRequest(req)
11541158
selector.mute(connectionId)
11551159
handleChannelMuteEvent(connectionId, ChannelMuteEvent.REQUEST_RECEIVED)

Diff for: core/src/test/java/kafka/server/handlers/DescribeTopicPartitionsRequestHandlerTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ private RequestChannel.Request buildRequest(AbstractRequest request,
527527
listenerName, SecurityProtocol.SSL, ClientInformation.EMPTY, false,
528528
Optional.of(kafkaPrincipalSerde));
529529
return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, buffer,
530-
requestChannelMetrics, scala.Option.apply(null));
530+
requestChannelMetrics, scala.Option.apply(null), scala.Option.apply(null));
531531
}
532532

533533
KafkaConfig createKafkaDefaultConfig() {

Diff for: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/KRaftMetadataRequestBenchmark.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ private RequestChannel.Request buildAllTopicMetadataRequest() {
226226
RequestContext context = new RequestContext(header, "1", null, principal,
227227
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT),
228228
SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
229-
return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, bodyBuffer, requestChannelMetrics, Option.empty());
229+
return new RequestChannel.Request(1, context, 0, MemoryPool.NONE, bodyBuffer, requestChannelMetrics, Option.empty(), Option.empty());
230230
}
231231

232232
@Benchmark

0 commit comments

Comments
 (0)