Skip to content

KAFKA-18760: Deprecate Optional<String> and return String from public Endpoint#listener #19191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions clients/src/main/java/org/apache/kafka/common/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,30 @@
*/
package org.apache.kafka.common;

import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;

import java.util.Locale;
import java.util.Objects;
import java.util.Optional;

/**
* Represents a broker endpoint.
*/

@InterfaceStability.Evolving
public class Endpoint {

private final String listenerName;
private final SecurityProtocol securityProtocol;
private final String host;
private final int port;

public static String parseListenerName(String connectionString) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems only KafkaConfig uses this helper. Could you please move it to the KafkaConfig instead of leaving it in this public APIs?

int firstColon = connectionString.indexOf(':');
if (firstColon < 0) {
throw new KafkaException("Unable to parse a listener name from " + connectionString);
}
return connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT);
}

public Endpoint(String listenerName, SecurityProtocol securityProtocol, String host, int port) {
this.listenerName = listenerName;
this.securityProtocol = securityProtocol;
Expand All @@ -45,8 +51,8 @@ public Endpoint(String listenerName, SecurityProtocol securityProtocol, String h
* Returns the listener name of this endpoint. This is non-empty for endpoints provided
* to broker plugins, but may be empty when used in clients.
*/
public Optional<String> listenerName() {
return Optional.ofNullable(listenerName);
public String listenerName() {
return listenerName;
}

/**
Expand Down
51 changes: 26 additions & 25 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._
import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
import org.apache.kafka.network.EndPoint
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
Expand Down Expand Up @@ -96,7 +95,7 @@ class SocketServer(
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
// data-plane
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[Endpoint, DataPlaneAcceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)

private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
Expand Down Expand Up @@ -161,8 +160,8 @@ class SocketServer(
* Therefore, we do not know that any particular request processor will be running by the end of
* this function -- just that it might be running.
*
* @param authorizerFutures Future per [[EndPoint]] used to wait before starting the
* processor corresponding to the [[EndPoint]]. Any endpoint
* @param authorizerFutures Future per [[Endpoint]] used to wait before starting the
* processor corresponding to the [[Endpoint]]. Any endpoint
* that does not appear in this map will be started once all
* authorizerFutures are complete.
*
Expand All @@ -181,7 +180,7 @@ class SocketServer(
// Because of ephemeral ports, we need to match acceptors to futures by looking at
// the listener name, rather than the endpoint object.
val authorizerFuture = authorizerFutures.find {
case (endpoint, _) => acceptor.endPoint.listenerName.value().equals(endpoint.listenerName().get())
case (endpoint, _) => acceptor.endPoint.listenerName.equals(endpoint.listenerName())
} match {
case None => allAuthorizerFuturesComplete
case Some((_, future)) => future
Expand Down Expand Up @@ -210,23 +209,24 @@ class SocketServer(
enableFuture
}

private def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {
private def createDataPlaneAcceptorAndProcessors(endpoint: Endpoint): Unit = synchronized {
if (stopped) {
throw new RuntimeException("Can't create new data plane acceptor and processors: SocketServer is stopped.")
}
val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
connectionQuotas.addListener(config, endpoint.listenerName)
val isPrivilegedListener = config.interBrokerListenerName == endpoint.listenerName
val listenerName = ListenerName.normalised(endpoint.listenerName)
val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
connectionQuotas.addListener(config, listenerName)
val isPrivilegedListener = config.interBrokerListenerName == listenerName
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
config.addReconfigurable(dataPlaneAcceptor)
dataPlaneAcceptor.configure(parsedConfigs)
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
info(s"Created data-plane acceptor and processors for endpoint : ${listenerName}")
}

private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private def endpoints = config.listeners.map(l => ListenerName.normalised(l.listenerName) -> l).toMap

protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
protected def createDataPlaneAcceptor(endPoint: Endpoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
}

Expand Down Expand Up @@ -277,7 +277,7 @@ class SocketServer(
/**
* This method is called to dynamically add listeners.
*/
def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
def addListeners(listenersAdded: Seq[Endpoint]): Unit = synchronized {
if (stopped) {
throw new RuntimeException("can't add new listeners: SocketServer is stopped.")
}
Expand All @@ -297,10 +297,10 @@ class SocketServer(
}
}

def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
def removeListeners(listenersRemoved: Seq[Endpoint]): Unit = synchronized {
info(s"Removing data-plane listeners for endpoints $listenersRemoved")
listenersRemoved.foreach { endpoint =>
connectionQuotas.removeListener(config, endpoint.listenerName)
connectionQuotas.removeListener(config, ListenerName.normalised(endpoint.listenerName))
dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
acceptor.beginShutdown()
acceptor.close()
Expand Down Expand Up @@ -345,7 +345,7 @@ class SocketServer(
// For test usage
def dataPlaneAcceptor(listenerName: String): Option[DataPlaneAcceptor] = {
dataPlaneAcceptors.asScala.foreach { case (endPoint, acceptor) =>
if (endPoint.listenerName.value() == listenerName)
if (endPoint.listenerName == listenerName)
return Some(acceptor)
}
None
Expand Down Expand Up @@ -376,7 +376,7 @@ object DataPlaneAcceptor {
}

class DataPlaneAcceptor(socketServer: SocketServer,
endPoint: EndPoint,
endPoint: Endpoint,
config: KafkaConfig,
nodeId: Int,
connectionQuotas: ConnectionQuotas,
Expand Down Expand Up @@ -409,7 +409,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* Returns the listener name associated with this reconfigurable. Listener-specific
* configs corresponding to this listener name are provided for reconfiguration.
*/
override def listenerName(): ListenerName = endPoint.listenerName
override def listenerName(): ListenerName = ListenerName.normalised(endPoint.listenerName)

/**
* Returns the names of configs that may be reconfigured.
Expand Down Expand Up @@ -477,7 +477,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/
private[kafka] abstract class Acceptor(val socketServer: SocketServer,
val endPoint: EndPoint,
val endPoint: Endpoint,
var config: KafkaConfig,
nodeId: Int,
val connectionQuotas: ConnectionQuotas,
Expand Down Expand Up @@ -523,7 +523,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor")
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
s"${metricPrefix()}AcceptorBlockedPercent",
Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
Map(ListenerMetricTag -> endPoint.listenerName).asJava)
private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
Expand Down Expand Up @@ -636,7 +636,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
new InetSocketAddress(host, port)
}
val serverChannel = socketServer.socketFactory.openServerSocket(
endPoint.listenerName.value(),
endPoint.listenerName,
socketAddress,
listenBacklogSize,
recvBufferSize)
Expand Down Expand Up @@ -690,14 +690,15 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
private def accept(key: SelectionKey): Option[SocketChannel] = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
val listenerName = ListenerName.normalised(endPoint.listenerName)
try {
connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
connectionQuotas.inc(listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
configureAcceptedSocketChannel(socketChannel)
Some(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
connectionQuotas.closeChannel(this, endPoint.listenerName, socketChannel)
connectionQuotas.closeChannel(this, listenerName, socketChannel)
None
case e: ConnectionThrottledException =>
val ip = socketChannel.socket.getInetAddress
Expand All @@ -707,7 +708,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
None
case e: IOException =>
error(s"Encountered an error while configuring the connection, closing it.", e)
connectionQuotas.closeChannel(this, endPoint.listenerName, socketChannel)
connectionQuotas.closeChannel(this, listenerName, socketChannel)
None
}
}
Expand Down Expand Up @@ -749,7 +750,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
def wakeup(): Unit = nioSelector.wakeup()

def addProcessors(toCreate: Int): Unit = synchronized {
val listenerName = endPoint.listenerName
val listenerName = ListenerName.normalised(endPoint.listenerName)
val securityProtocol = endPoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class BrokerServer(
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)

val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
config.effectiveAdvertisedBrokerListeners.map(_.toPublic()).asJava).
config.effectiveAdvertisedBrokerListeners.asJava).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))

Expand Down Expand Up @@ -478,8 +478,7 @@ class BrokerServer(
if (listenerName != null) {
val endpoint = listenerInfo.listeners().values().stream
.filter(e =>
e.listenerName().isPresent &&
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
ListenerName.normalised(e.listenerName()).equals(ListenerName.normalised(listenerName))
)
.findFirst()
.orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class ControllerServer(
sharedServer.socketFactory)

val listenerInfo = ListenerInfo
.create(config.effectiveAdvertisedControllerListeners.map(_.toPublic).asJava)
.create(config.effectiveAdvertisedControllerListeners.asJava)
.withWildcardHostnamesResolved()
.withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port())
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.network.EndPoint
import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
Expand Down Expand Up @@ -885,9 +885,9 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi

def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val oldConfig = server.config
val newListeners = newConfig.listeners.map(_.listenerName).toSet
val oldAdvertisedListeners = oldConfig.effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
val oldListeners = oldConfig.listeners.map(_.listenerName).toSet
val newListeners = newConfig.listeners.map(l => ListenerName.normalised(l.listenerName)).toSet
val oldAdvertisedListeners = oldConfig.effectiveAdvertisedBrokerListeners.map(l => ListenerName.normalised(l.listenerName)).toSet
val oldListeners = oldConfig.listeners.map(l => ListenerName.normalised(l.listenerName)).toSet
if (!oldAdvertisedListeners.subsetOf(newListeners))
throw new ConfigException(s"Advertised listeners '$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
if (!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
Expand All @@ -912,17 +912,17 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
val newListenerMap = listenersToMap(newListeners)
val oldListeners = oldConfig.listeners
val oldListenerMap = listenersToMap(oldListeners)
val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(e.listenerName))
val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(e.listenerName))
val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(ListenerName.normalised(e.listenerName)))
val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(ListenerName.normalised(e.listenerName)))
if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) {
LoginManager.closeAll() // Clear SASL login cache to force re-login
if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
}
}

private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =
listeners.map(e => (e.listenerName, e)).toMap
private def listenersToMap(listeners: Seq[Endpoint]): Map[ListenerName, Endpoint] =
listeners.map(e => (ListenerName.normalised(e.listenerName), e)).toMap

}

Expand Down
Loading