Skip to content

KAFKA-18760: Deprecate Optional<String> and return String from public Endpoint#listener #19191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4945,7 +4945,7 @@ AddRaftVoterRequest.Builder createRequest(int timeoutMs) {
new AddRaftVoterRequestData.ListenerCollection();
endpoints.forEach(endpoint ->
listeners.add(new AddRaftVoterRequestData.Listener().
setName(endpoint.name()).
setName(endpoint.listener()).
setHost(endpoint.host()).
setPort(endpoint.port())));
return new AddRaftVoterRequest.Builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
@InterfaceStability.Stable
public class RaftVoterEndpoint {
private final String name;
private final String listener;
private final String host;
private final int port;

Expand All @@ -49,22 +49,33 @@ static String requireNonNullAllCapsNonEmpty(String input) {
/**
* Create an endpoint for a metadata quorum voter.
*
* @param name The human-readable name for this endpoint. For example, CONTROLLER.
* @param listener The human-readable name for this endpoint. For example, CONTROLLER.
* @param host The DNS hostname for this endpoint.
* @param port The network port for this endpoint.
*/
public RaftVoterEndpoint(
String name,
String listener,
String host,
int port
) {
this.name = requireNonNullAllCapsNonEmpty(name);
this.listener = requireNonNullAllCapsNonEmpty(listener);
this.host = Objects.requireNonNull(host);
this.port = port;
}

/**
* The listener name for this endpoint.
*/
public String listener() {
return listener;
}

/**
* @deprecated Since 4.1. Use {@link #listener()} instead. This function will be removed in 5.0.
*/
@Deprecated(since = "4.1", forRemoval = true)
public String name() {
return name;
return listener;
}

public String host() {
Expand All @@ -79,20 +90,20 @@ public int port() {
public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(getClass()))) return false;
RaftVoterEndpoint other = (RaftVoterEndpoint) o;
return name.equals(other.name) &&
return listener.equals(other.listener) &&
host.equals(other.host) &&
port == other.port;
}

@Override
public int hashCode() {
return Objects.hash(name, host, port);
return Objects.hash(listener, host, port);
}

@Override
public String toString() {
// enclose IPv6 hosts in square brackets for readability
String hostString = host.contains(":") ? "[" + host + "]" : host;
return name + "://" + hostString + ":" + port;
return listener + "://" + hostString + ":" + port;
}
}
25 changes: 16 additions & 9 deletions clients/src/main/java/org/apache/kafka/common/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.common;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;

import java.util.Objects;
Expand All @@ -26,27 +25,35 @@
* Represents a broker endpoint.
*/

@InterfaceStability.Evolving
public class Endpoint {

private final String listenerName;
private final String listener;
private final SecurityProtocol securityProtocol;
private final String host;
private final int port;

public Endpoint(String listenerName, SecurityProtocol securityProtocol, String host, int port) {
this.listenerName = listenerName;
public Endpoint(String listener, SecurityProtocol securityProtocol, String host, int port) {
this.listener = listener;
this.securityProtocol = securityProtocol;
this.host = host;
this.port = port;
}

/**
* Returns the listener name of this endpoint.
*/
public String listener() {
return listener;
}

/**
* Returns the listener name of this endpoint. This is non-empty for endpoints provided
* to broker plugins, but may be empty when used in clients.
* @deprecated Since 4.1. Use {@link #listener()} instead. This function will be removed in 5.0.
*/
@Deprecated(since = "4.1", forRemoval = true)
public Optional<String> listenerName() {
return Optional.ofNullable(listenerName);
return Optional.ofNullable(listener);
}

/**
Expand Down Expand Up @@ -80,7 +87,7 @@ public boolean equals(Object o) {
}

Endpoint that = (Endpoint) o;
return Objects.equals(this.listenerName, that.listenerName) &&
return Objects.equals(this.listener, that.listener) &&
Objects.equals(this.securityProtocol, that.securityProtocol) &&
Objects.equals(this.host, that.host) &&
this.port == that.port;
Expand All @@ -89,13 +96,13 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(listenerName, securityProtocol, host, port);
return Objects.hash(listener, securityProtocol, host, port);
}

@Override
public String toString() {
return "Endpoint(" +
"listenerName='" + listenerName + '\'' +
"listenerName='" + listener + '\'' +
", securityProtocol=" + securityProtocol +
", host='" + host + '\'' +
", port=" + port +
Expand Down
67 changes: 34 additions & 33 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import org.apache.kafka.network.EndPoint
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
Expand Down Expand Up @@ -96,7 +95,7 @@ class SocketServer(
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
// data-plane
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[Endpoint, DataPlaneAcceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, time, apiVersionManager.newRequestMetrics)

private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
Expand Down Expand Up @@ -161,8 +160,8 @@ class SocketServer(
* Therefore, we do not know that any particular request processor will be running by the end of
* this function -- just that it might be running.
*
* @param authorizerFutures Future per [[EndPoint]] used to wait before starting the
* processor corresponding to the [[EndPoint]]. Any endpoint
* @param authorizerFutures Future per [[Endpoint]] used to wait before starting the
* processor corresponding to the [[Endpoint]]. Any endpoint
* that does not appear in this map will be started once all
* authorizerFutures are complete.
*
Expand All @@ -181,7 +180,7 @@ class SocketServer(
// Because of ephemeral ports, we need to match acceptors to futures by looking at
// the listener name, rather than the endpoint object.
val authorizerFuture = authorizerFutures.find {
case (endpoint, _) => acceptor.endPoint.listenerName.value().equals(endpoint.listenerName().get())
case (endpoint, _) => acceptor.endPoint.listener.equals(endpoint.listener())
} match {
case None => allAuthorizerFuturesComplete
case Some((_, future)) => future
Expand Down Expand Up @@ -210,23 +209,24 @@ class SocketServer(
enableFuture
}

private def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {
private def createDataPlaneAcceptorAndProcessors(endpoint: Endpoint): Unit = synchronized {
if (stopped) {
throw new RuntimeException("Can't create new data plane acceptor and processors: SocketServer is stopped.")
}
val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
connectionQuotas.addListener(config, endpoint.listenerName)
val isPrivilegedListener = config.interBrokerListenerName == endpoint.listenerName
val listenerName = ListenerName.normalised(endpoint.listener)
val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
connectionQuotas.addListener(config, listenerName)
val isPrivilegedListener = config.interBrokerListenerName == listenerName
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
config.addReconfigurable(dataPlaneAcceptor)
dataPlaneAcceptor.configure(parsedConfigs)
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
info(s"Created data-plane acceptor and processors for endpoint : ${listenerName}")
}

private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private def endpoints = config.listeners.map(l => ListenerName.normalised(l.listener) -> l).toMap

protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
protected def createDataPlaneAcceptor(endPoint: Endpoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
}

Expand Down Expand Up @@ -277,7 +277,7 @@ class SocketServer(
/**
* This method is called to dynamically add listeners.
*/
def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
def addListeners(listenersAdded: Seq[Endpoint]): Unit = synchronized {
if (stopped) {
throw new RuntimeException("can't add new listeners: SocketServer is stopped.")
}
Expand All @@ -297,10 +297,10 @@ class SocketServer(
}
}

def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
def removeListeners(listenersRemoved: Seq[Endpoint]): Unit = synchronized {
info(s"Removing data-plane listeners for endpoints $listenersRemoved")
listenersRemoved.foreach { endpoint =>
connectionQuotas.removeListener(config, endpoint.listenerName)
connectionQuotas.removeListener(config, ListenerName.normalised(endpoint.listener))
dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
acceptor.beginShutdown()
acceptor.close()
Expand Down Expand Up @@ -345,7 +345,7 @@ class SocketServer(
// For test usage
def dataPlaneAcceptor(listenerName: String): Option[DataPlaneAcceptor] = {
dataPlaneAcceptors.asScala.foreach { case (endPoint, acceptor) =>
if (endPoint.listenerName.value() == listenerName)
if (endPoint.listener == listenerName)
return Some(acceptor)
}
None
Expand Down Expand Up @@ -374,7 +374,7 @@ object DataPlaneAcceptor {
}

class DataPlaneAcceptor(socketServer: SocketServer,
endPoint: EndPoint,
endPoint: Endpoint,
config: KafkaConfig,
nodeId: Int,
connectionQuotas: ConnectionQuotas,
Expand Down Expand Up @@ -404,7 +404,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* Returns the listener name associated with this reconfigurable. Listener-specific
* configs corresponding to this listener name are provided for reconfiguration.
*/
override def listenerName(): ListenerName = endPoint.listenerName
override def listenerName(): ListenerName = ListenerName.normalised(endPoint.listener)

/**
* Returns the names of configs that may be reconfigured.
Expand Down Expand Up @@ -451,7 +451,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
val newNumNetworkThreads = configs.get(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG).asInstanceOf[Int]

if (newNumNetworkThreads != processors.length) {
info(s"Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads")
info(s"Resizing network thread pool size for ${endPoint.listener} listener from ${processors.length} to $newNumNetworkThreads")
if (newNumNetworkThreads > processors.length) {
addProcessors(newNumNetworkThreads - processors.length)
} else if (newNumNetworkThreads < processors.length) {
Expand All @@ -472,7 +472,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/
private[kafka] abstract class Acceptor(val socketServer: SocketServer,
val endPoint: EndPoint,
val endPoint: Endpoint,
var config: KafkaConfig,
nodeId: Int,
val connectionQuotas: ConnectionQuotas,
Expand Down Expand Up @@ -515,15 +515,15 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor")
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
"AcceptorBlockedPercent",
Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
Map(ListenerMetricTag -> endPoint.listener).asJava)
private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
private val started = new AtomicBoolean()
private[network] val startedFuture = new CompletableFuture[Void]()

val thread: KafkaThread = KafkaThread.nonDaemon(
s"data-plane-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
s"data-plane-kafka-socket-acceptor-${endPoint.listener}-${endPoint.securityProtocol}-${endPoint.port}",
this)

def start(): Unit = synchronized {
Expand All @@ -535,19 +535,19 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
serverChannel = openServerSocket(endPoint.host, endPoint.port, listenBacklogSize)
debug(s"Opened endpoint ${endPoint.host}:${endPoint.port}")
}
debug(s"Starting processors for listener ${endPoint.listenerName}")
debug(s"Starting processors for listener ${endPoint.listener}")
processors.foreach(_.start())
debug(s"Starting acceptor thread for listener ${endPoint.listenerName}")
debug(s"Starting acceptor thread for listener ${endPoint.listener}")
thread.start()
startedFuture.complete(null)
started.set(true)
} catch {
case e: ClosedChannelException =>
debug(s"Refusing to start acceptor for ${endPoint.listenerName} since the acceptor has already been shut down.")
debug(s"Refusing to start acceptor for ${endPoint.listener} since the acceptor has already been shut down.")
startedFuture.completeExceptionally(e)
case t: Throwable =>
error(s"Unable to start acceptor for ${endPoint.listenerName}", t)
startedFuture.completeExceptionally(new RuntimeException(s"Unable to start acceptor for ${endPoint.listenerName}", t))
error(s"Unable to start acceptor for ${endPoint.listener}", t)
startedFuture.completeExceptionally(new RuntimeException(s"Unable to start acceptor for ${endPoint.listener}", t))
}
}

Expand Down Expand Up @@ -628,7 +628,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
new InetSocketAddress(host, port)
}
val serverChannel = socketServer.socketFactory.openServerSocket(
endPoint.listenerName.value(),
endPoint.listener,
socketAddress,
listenBacklogSize,
recvBufferSize)
Expand Down Expand Up @@ -682,14 +682,15 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private def accept(key: SelectionKey): Option[SocketChannel] = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
val listenerName = ListenerName.normalised(endPoint.listener)
try {
connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
connectionQuotas.inc(listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
configureAcceptedSocketChannel(socketChannel)
Some(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
connectionQuotas.closeChannel(this, endPoint.listenerName, socketChannel)
connectionQuotas.closeChannel(this, listenerName, socketChannel)
None
case e: ConnectionThrottledException =>
val ip = socketChannel.socket.getInetAddress
Expand All @@ -699,7 +700,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
None
case e: IOException =>
error(s"Encountered an error while configuring the connection, closing it.", e)
connectionQuotas.closeChannel(this, endPoint.listenerName, socketChannel)
connectionQuotas.closeChannel(this, listenerName, socketChannel)
None
}
}
Expand Down Expand Up @@ -741,7 +742,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
def wakeup(): Unit = nioSelector.wakeup()

def addProcessors(toCreate: Int): Unit = synchronized {
val listenerName = endPoint.listenerName
val listenerName = ListenerName.normalised(endPoint.listener)
val securityProtocol = endPoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()

Expand All @@ -761,7 +762,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
connectionDisconnectListeners: Seq[ConnectionDisconnectListener]): Processor = {
val name = s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
val name = s"data-plane-kafka-network-thread-$nodeId-${endPoint.listener}-${endPoint.securityProtocol}-$id"
new Processor(id,
time,
config.socketRequestMaxBytes,
Expand Down
Loading