Skip to content

Commit 5a57f0b

Browse files
committed
KAFKA-18760: Deprecate Optional<String> and return String from public EndPoint#listenerName
Signed-off-by: PoAn Yang <[email protected]>
1 parent 759fbbb commit 5a57f0b

File tree

28 files changed

+136
-178
lines changed

28 files changed

+136
-178
lines changed

Diff for: clients/src/main/java/org/apache/kafka/common/Endpoint.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,30 @@
1616
*/
1717
package org.apache.kafka.common;
1818

19-
import org.apache.kafka.common.annotation.InterfaceStability;
2019
import org.apache.kafka.common.security.auth.SecurityProtocol;
2120

21+
import java.util.Locale;
2222
import java.util.Objects;
23-
import java.util.Optional;
2423

2524
/**
2625
* Represents a broker endpoint.
2726
*/
2827

29-
@InterfaceStability.Evolving
3028
public class Endpoint {
3129

3230
private final String listenerName;
3331
private final SecurityProtocol securityProtocol;
3432
private final String host;
3533
private final int port;
3634

35+
public static String parseListenerName(String connectionString) {
36+
int firstColon = connectionString.indexOf(':');
37+
if (firstColon < 0) {
38+
throw new KafkaException("Unable to parse a listener name from " + connectionString);
39+
}
40+
return connectionString.substring(0, firstColon).toUpperCase(Locale.ROOT);
41+
}
42+
3743
public Endpoint(String listenerName, SecurityProtocol securityProtocol, String host, int port) {
3844
this.listenerName = listenerName;
3945
this.securityProtocol = securityProtocol;
@@ -45,8 +51,8 @@ public Endpoint(String listenerName, SecurityProtocol securityProtocol, String h
4551
* Returns the listener name of this endpoint. This is non-empty for endpoints provided
4652
* to broker plugins, but may be empty when used in clients.
4753
*/
48-
public Optional<String> listenerName() {
49-
return Optional.ofNullable(listenerName);
54+
public String listenerName() {
55+
return listenerName;
5056
}
5157

5258
/**

Diff for: core/src/main/scala/kafka/network/SocketServer.scala

+26-25
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import kafka.network.Processor._
2929
import kafka.network.RequestChannel.{CloseConnectionResponse, EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
3030
import kafka.network.SocketServer._
3131
import kafka.server.{ApiVersionManager, BrokerReconfigurable, KafkaConfig}
32-
import org.apache.kafka.network.EndPoint
3332
import org.apache.kafka.common.message.ApiMessageType.ListenerType
3433
import kafka.utils._
3534
import org.apache.kafka.common.config.ConfigException
@@ -96,7 +95,7 @@ class SocketServer(
9695
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
9796
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
9897
// data-plane
99-
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()
98+
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[Endpoint, DataPlaneAcceptor]()
10099
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
101100

102101
private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
@@ -161,8 +160,8 @@ class SocketServer(
161160
* Therefore, we do not know that any particular request processor will be running by the end of
162161
* this function -- just that it might be running.
163162
*
164-
* @param authorizerFutures Future per [[EndPoint]] used to wait before starting the
165-
* processor corresponding to the [[EndPoint]]. Any endpoint
163+
* @param authorizerFutures Future per [[Endpoint]] used to wait before starting the
164+
* processor corresponding to the [[Endpoint]]. Any endpoint
166165
* that does not appear in this map will be started once all
167166
* authorizerFutures are complete.
168167
*
@@ -181,7 +180,7 @@ class SocketServer(
181180
// Because of ephemeral ports, we need to match acceptors to futures by looking at
182181
// the listener name, rather than the endpoint object.
183182
val authorizerFuture = authorizerFutures.find {
184-
case (endpoint, _) => acceptor.endPoint.listenerName.value().equals(endpoint.listenerName().get())
183+
case (endpoint, _) => acceptor.endPoint.listenerName.equals(endpoint.listenerName())
185184
} match {
186185
case None => allAuthorizerFuturesComplete
187186
case Some((_, future)) => future
@@ -210,23 +209,24 @@ class SocketServer(
210209
enableFuture
211210
}
212211

213-
private def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = synchronized {
212+
private def createDataPlaneAcceptorAndProcessors(endpoint: Endpoint): Unit = synchronized {
214213
if (stopped) {
215214
throw new RuntimeException("Can't create new data plane acceptor and processors: SocketServer is stopped.")
216215
}
217-
val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
218-
connectionQuotas.addListener(config, endpoint.listenerName)
219-
val isPrivilegedListener = config.interBrokerListenerName == endpoint.listenerName
216+
val listenerName = ListenerName.normalised(endpoint.listenerName)
217+
val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
218+
connectionQuotas.addListener(config, listenerName)
219+
val isPrivilegedListener = config.interBrokerListenerName == listenerName
220220
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
221221
config.addReconfigurable(dataPlaneAcceptor)
222222
dataPlaneAcceptor.configure(parsedConfigs)
223223
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
224-
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
224+
info(s"Created data-plane acceptor and processors for endpoint : ${listenerName}")
225225
}
226226

227-
private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
227+
private def endpoints = config.listeners.map(l => ListenerName.normalised(l.listenerName) -> l).toMap
228228

229-
protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
229+
protected def createDataPlaneAcceptor(endPoint: Endpoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
230230
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
231231
}
232232

@@ -277,7 +277,7 @@ class SocketServer(
277277
/**
278278
* This method is called to dynamically add listeners.
279279
*/
280-
def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
280+
def addListeners(listenersAdded: Seq[Endpoint]): Unit = synchronized {
281281
if (stopped) {
282282
throw new RuntimeException("can't add new listeners: SocketServer is stopped.")
283283
}
@@ -297,10 +297,10 @@ class SocketServer(
297297
}
298298
}
299299

300-
def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
300+
def removeListeners(listenersRemoved: Seq[Endpoint]): Unit = synchronized {
301301
info(s"Removing data-plane listeners for endpoints $listenersRemoved")
302302
listenersRemoved.foreach { endpoint =>
303-
connectionQuotas.removeListener(config, endpoint.listenerName)
303+
connectionQuotas.removeListener(config, ListenerName.normalised(endpoint.listenerName))
304304
dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
305305
acceptor.beginShutdown()
306306
acceptor.close()
@@ -345,7 +345,7 @@ class SocketServer(
345345
// For test usage
346346
def dataPlaneAcceptor(listenerName: String): Option[DataPlaneAcceptor] = {
347347
dataPlaneAcceptors.asScala.foreach { case (endPoint, acceptor) =>
348-
if (endPoint.listenerName.value() == listenerName)
348+
if (endPoint.listenerName == listenerName)
349349
return Some(acceptor)
350350
}
351351
None
@@ -376,7 +376,7 @@ object DataPlaneAcceptor {
376376
}
377377

378378
class DataPlaneAcceptor(socketServer: SocketServer,
379-
endPoint: EndPoint,
379+
endPoint: Endpoint,
380380
config: KafkaConfig,
381381
nodeId: Int,
382382
connectionQuotas: ConnectionQuotas,
@@ -409,7 +409,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
409409
* Returns the listener name associated with this reconfigurable. Listener-specific
410410
* configs corresponding to this listener name are provided for reconfiguration.
411411
*/
412-
override def listenerName(): ListenerName = endPoint.listenerName
412+
override def listenerName(): ListenerName = ListenerName.normalised(endPoint.listenerName)
413413

414414
/**
415415
* Returns the names of configs that may be reconfigured.
@@ -477,7 +477,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
477477
* Thread that accepts and configures new connections. There is one of these per endpoint.
478478
*/
479479
private[kafka] abstract class Acceptor(val socketServer: SocketServer,
480-
val endPoint: EndPoint,
480+
val endPoint: Endpoint,
481481
var config: KafkaConfig,
482482
nodeId: Int,
483483
val connectionQuotas: ConnectionQuotas,
@@ -523,7 +523,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
523523
private val backwardCompatibilityMetricGroup = new KafkaMetricsGroup("kafka.network", "Acceptor")
524524
private val blockedPercentMeterMetricName = backwardCompatibilityMetricGroup.metricName(
525525
s"${metricPrefix()}AcceptorBlockedPercent",
526-
Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
526+
Map(ListenerMetricTag -> endPoint.listenerName).asJava)
527527
private val blockedPercentMeter = metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", TimeUnit.NANOSECONDS)
528528
private var currentProcessorIndex = 0
529529
private[network] val throttledSockets = new mutable.PriorityQueue[DelayedCloseSocket]()
@@ -636,7 +636,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
636636
new InetSocketAddress(host, port)
637637
}
638638
val serverChannel = socketServer.socketFactory.openServerSocket(
639-
endPoint.listenerName.value(),
639+
endPoint.listenerName,
640640
socketAddress,
641641
listenBacklogSize,
642642
recvBufferSize)
@@ -690,14 +690,15 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
690690
private def accept(key: SelectionKey): Option[SocketChannel] = {
691691
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
692692
val socketChannel = serverSocketChannel.accept()
693+
val listenerName = ListenerName.normalised(endPoint.listenerName)
693694
try {
694-
connectionQuotas.inc(endPoint.listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
695+
connectionQuotas.inc(listenerName, socketChannel.socket.getInetAddress, blockedPercentMeter)
695696
configureAcceptedSocketChannel(socketChannel)
696697
Some(socketChannel)
697698
} catch {
698699
case e: TooManyConnectionsException =>
699700
info(s"Rejected connection from ${e.ip}, address already has the configured maximum of ${e.count} connections.")
700-
connectionQuotas.closeChannel(this, endPoint.listenerName, socketChannel)
701+
connectionQuotas.closeChannel(this, listenerName, socketChannel)
701702
None
702703
case e: ConnectionThrottledException =>
703704
val ip = socketChannel.socket.getInetAddress
@@ -707,7 +708,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
707708
None
708709
case e: IOException =>
709710
error(s"Encountered an error while configuring the connection, closing it.", e)
710-
connectionQuotas.closeChannel(this, endPoint.listenerName, socketChannel)
711+
connectionQuotas.closeChannel(this, listenerName, socketChannel)
711712
None
712713
}
713714
}
@@ -749,7 +750,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
749750
def wakeup(): Unit = nioSelector.wakeup()
750751

751752
def addProcessors(toCreate: Int): Unit = synchronized {
752-
val listenerName = endPoint.listenerName
753+
val listenerName = ListenerName.normalised(endPoint.listenerName)
753754
val securityProtocol = endPoint.securityProtocol
754755
val listenerProcessors = new ArrayBuffer[Processor]()
755756

Diff for: core/src/main/scala/kafka/server/BrokerServer.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ class BrokerServer(
275275
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
276276

277277
val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
278-
config.effectiveAdvertisedBrokerListeners.map(_.toPublic()).asJava).
278+
config.effectiveAdvertisedBrokerListeners.asJava).
279279
withWildcardHostnamesResolved().
280280
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
281281

@@ -478,8 +478,7 @@ class BrokerServer(
478478
if (listenerName != null) {
479479
val endpoint = listenerInfo.listeners().values().stream
480480
.filter(e =>
481-
e.listenerName().isPresent &&
482-
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
481+
ListenerName.normalised(e.listenerName()).equals(ListenerName.normalised(listenerName))
483482
)
484483
.findFirst()
485484
.orElseThrow(() => new ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,

Diff for: core/src/main/scala/kafka/server/ControllerServer.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ class ControllerServer(
173173
sharedServer.socketFactory)
174174

175175
val listenerInfo = ListenerInfo
176-
.create(config.effectiveAdvertisedControllerListeners.map(_.toPublic).asJava)
176+
.create(config.effectiveAdvertisedControllerListeners.asJava)
177177
.withWildcardHostnamesResolved()
178178
.withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
179179
socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port())

Diff for: core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

+8-8
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
2626
import kafka.server.DynamicBrokerConfig._
2727
import kafka.utils.{CoreUtils, Logging}
2828
import org.apache.kafka.common.Reconfigurable
29-
import org.apache.kafka.network.EndPoint
29+
import org.apache.kafka.common.Endpoint
3030
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
3131
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs}
3232
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
@@ -885,9 +885,9 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
885885

886886
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
887887
val oldConfig = server.config
888-
val newListeners = newConfig.listeners.map(_.listenerName).toSet
889-
val oldAdvertisedListeners = oldConfig.effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
890-
val oldListeners = oldConfig.listeners.map(_.listenerName).toSet
888+
val newListeners = newConfig.listeners.map(l => ListenerName.normalised(l.listenerName)).toSet
889+
val oldAdvertisedListeners = oldConfig.effectiveAdvertisedBrokerListeners.map(l => ListenerName.normalised(l.listenerName)).toSet
890+
val oldListeners = oldConfig.listeners.map(l => ListenerName.normalised(l.listenerName)).toSet
891891
if (!oldAdvertisedListeners.subsetOf(newListeners))
892892
throw new ConfigException(s"Advertised listeners '$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
893893
if (!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
@@ -912,17 +912,17 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi
912912
val newListenerMap = listenersToMap(newListeners)
913913
val oldListeners = oldConfig.listeners
914914
val oldListenerMap = listenersToMap(oldListeners)
915-
val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(e.listenerName))
916-
val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(e.listenerName))
915+
val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(ListenerName.normalised(e.listenerName)))
916+
val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(ListenerName.normalised(e.listenerName)))
917917
if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) {
918918
LoginManager.closeAll() // Clear SASL login cache to force re-login
919919
if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
920920
if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
921921
}
922922
}
923923

924-
private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =
925-
listeners.map(e => (e.listenerName, e)).toMap
924+
private def listenersToMap(listeners: Seq[Endpoint]): Map[ListenerName, Endpoint] =
925+
listeners.map(e => (ListenerName.normalised(e.listenerName), e)).toMap
926926

927927
}
928928

0 commit comments

Comments
 (0)