Skip to content

KAFKA-17601: Inter-broker connections do not expose their clientSoftwareName and clientSoftwareVersion tags #17731

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ public class RequestContext implements AuthorizableRequestContext {
public final KafkaPrincipal principal;
public final ListenerName listenerName;
public final SecurityProtocol securityProtocol;
public final ClientInformation clientInformation;
// The client information can be updated if the request is ApiVersionRequest,
// so the client information will not be unknown for ApiVersionRequest.
public ClientInformation clientInformation;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making this mutable and public seems dangerous. A few things to consider:

  1. Thread safety.
  2. Whether we want to allow it to be updated only once (if so, we'd want to have a method that enforces that).

public final boolean fromPrivilegedListener;
public final Optional<KafkaPrincipalSerde> principalSerde;

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,7 @@ private[kafka] class Processor(
channel.channelMetadataRegistry.registerClientInformation(new ClientInformation(
apiVersionsRequest.data.clientSoftwareName,
apiVersionsRequest.data.clientSoftwareVersion))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we update context#clientInformation directly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is that context#clientInformation gets updated when the request is related to API_VERSIONS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chia7712, thanks for the suggestion. I updated code. Could you take a look again? Thanks.

context.clientInformation = channel.channelMetadataRegistry.clientInformation
}
}
requestChannel.sendRequest(req)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ class SocketServerTest {
sendRequest(plainSocket, apiVersionRequestBytes(clientId, version))
var receivedReq = receiveRequest(server.dataPlaneRequestChannel)

assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, receivedReq.context.clientInformation.softwareName)
assertEquals(ClientInformation.UNKNOWN_NAME_OR_VERSION, receivedReq.context.clientInformation.softwareVersion)
assertEquals(expectedClientSoftwareName, receivedReq.context.clientInformation.softwareName)
assertEquals(expectedClientSoftwareVersion, receivedReq.context.clientInformation.softwareVersion)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment above these lines is wrong with this change. I assume there was a reason for the original behavior though - @dajac @rajinisivaram do you recall?


server.dataPlaneRequestChannel.sendNoOpResponse(receivedReq)

Expand Down