diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java index a13bcf103b922..acfc6d05bacde 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java @@ -31,6 +31,7 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS; @@ -42,7 +43,8 @@ public class RequestContext implements AuthorizableRequestContext { public final KafkaPrincipal principal; public final ListenerName listenerName; public final SecurityProtocol securityProtocol; - public final ClientInformation clientInformation; + private ClientInformation clientInformation; + private final AtomicBoolean hasUpdatedClientInformation = new AtomicBoolean(false); public final boolean fromPrivilegedListener; public final Optional principalSerde; @@ -166,6 +168,22 @@ public String connectionId() { return connectionId; } + public void setClientInformation(ClientInformation clientInformation) { + // We only allow updating the client information once, and only if the + // current client information is UNKNOWN. This ensures that a malicious + // client cannot override a well-known client information with an + // arbitrary one. + if (hasUpdatedClientInformation.compareAndSet(false, true)) { + if (this.clientInformation == ClientInformation.EMPTY) { + this.clientInformation = clientInformation; + } + } + } + + public ClientInformation clientInformation() { + return clientInformation; + } + @Override public String listenerName() { return listenerName.value(); diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 306b633f6fa37..7a45b89390132 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -1050,6 +1050,8 @@ private[kafka] class Processor( channel.channelMetadataRegistry.registerClientInformation(new ClientInformation( apiVersionsRequest.data.clientSoftwareName, apiVersionsRequest.data.clientSoftwareVersion)) + // Update client information for ApiVersionRequest, so the client information will not be unknown for ApiVersionRequest. + context.setClientInformation(channel.channelMetadataRegistry.clientInformation) } } requestChannel.sendRequest(req) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6a4b8d8ca672e..7709e852206fb 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -271,12 +271,12 @@ class SocketServerTest { val address = plainSocket.getLocalAddress val clientId = "clientId" - // Send ApiVersionsRequest - unknown expected + // Send ApiVersionsRequest - if version < 3, unknown expected. Otherwise, client info expected. sendRequest(plainSocket, apiVersionRequestBytes(clientId, version)) var receivedReq = receiveRequest(server.dataPlaneRequestChannel) - assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, receivedReq.context.clientInformation.softwareName) - assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, receivedReq.context.clientInformation.softwareVersion) + assertEquals(expectedClientSoftwareName, receivedReq.context.clientInformation.softwareName) + assertEquals(expectedClientSoftwareVersion, receivedReq.context.clientInformation.softwareVersion) server.dataPlaneRequestChannel.sendNoOpResponse(receivedReq) diff --git a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java index 9af49f27d66cb..c48780f664e60 100644 --- a/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java +++ b/server/src/main/java/org/apache/kafka/network/RequestConvertToJson.java @@ -796,7 +796,7 @@ public static JsonNode requestDescMetrics(RequestHeader header, Optional 0) { node.set("temporaryMemoryBytes", new LongNode(temporaryMemoryBytes)); } diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadata.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadata.java index 58f7c4f1a5b0d..9c94d150a8cad 100644 --- a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadata.java +++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstanceMetadata.java @@ -39,10 +39,10 @@ public ClientMetricsInstanceMetadata(Uuid clientInstanceId, RequestContext reque attributesMap.put(ClientMetricsConfigs.CLIENT_INSTANCE_ID, clientInstanceId.toString()); attributesMap.put(ClientMetricsConfigs.CLIENT_ID, requestContext.clientId()); - attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, requestContext.clientInformation != null ? - requestContext.clientInformation.softwareName() : null); - attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, requestContext.clientInformation != null ? - requestContext.clientInformation.softwareVersion() : null); + attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_NAME, requestContext.clientInformation() != null ? + requestContext.clientInformation().softwareName() : null); + attributesMap.put(ClientMetricsConfigs.CLIENT_SOFTWARE_VERSION, requestContext.clientInformation() != null ? + requestContext.clientInformation().softwareVersion() : null); attributesMap.put(ClientMetricsConfigs.CLIENT_SOURCE_ADDRESS, requestContext.clientAddress != null ? requestContext.clientAddress.getHostAddress() : null); attributesMap.put(ClientMetricsConfigs.CLIENT_SOURCE_PORT, requestContext.clientPort.map(String::valueOf).orElse(null));