Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions config/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ process.roles=broker,controller
node.id=1

# List of controller endpoints used connect to the controller cluster
controller.quorum.bootstrap.servers=localhost:9093
controller.quorum.bootstrap.servers=192.168.31.242:17684
dynamic.broker.config.failure.policy=warn

############################# Socket Server Settings #############################

Expand All @@ -41,7 +42,7 @@ inter.broker.listener.name=PLAINTEXT

# Listener name, hostname and port the broker or the controller will advertise to clients.
# If not set, it uses the value for "listeners".
advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
advertised.listeners=PLAINTEXT://192.168.31.242:9092,CONTROLLER://192.168.31.242:9093

# A comma-separated list of the names of the listeners used by the controller.
# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, KRaftMetadataCache, MetadataCache, MetadataVersionConfigValidator}
import org.apache.kafka.metadata.{BrokerState, KRaftMetadataCache, ListenerInfo, MetadataCache, MetadataVersionConfigValidator}
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
Expand Down Expand Up @@ -494,8 +494,10 @@ class BrokerServer(
new DynamicConfigPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
sharedServer.dynamicConfigFatalFaultHandler,
dynamicConfigHandlers.toMap,
"broker"),
"broker",
Some(sharedServer.invalidConfigMetrics)),
new DynamicClientQuotaPublisher(
config.nodeId,
sharedServer.metadataPublishingFaultHandler,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ConfigAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class ConfigAdminManager(nodeId: Int,
): Unit = {
val perBrokerConfig = configResource.name().nonEmpty
val persistentProps = configRepository.config(configResource)
val configProps = conf.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
val (configProps, _) = conf.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig)
val alterConfigOps = resource.configs().asScala.map {
config =>
val opType = AlterConfigOp.OpType.forId(config.configOperation())
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,20 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager,
*/
class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {

@volatile private var _lastValidationResult: Option[DynamicBrokerConfig.ValidationResult] = None

def lastValidationResult: Option[DynamicBrokerConfig.ValidationResult] = _lastValidationResult

def processConfigChanges(brokerId: String, properties: Properties): Unit = {
if (brokerId.isEmpty)
val validationResult = if (brokerId.isEmpty)
brokerConfig.dynamicConfig.updateDefaultConfig(properties)
else if (brokerConfig.brokerId == brokerId.trim.toInt) {
brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
} else {
None
}
_lastValidationResult = validationResult
val updatedDynamicBrokerConfigs = brokerConfig.dynamicConfig.currentDynamicBrokerConfigs
val updatedDynamicDefaultConfigs = brokerConfig.dynamicConfig.currentDynamicDefaultConfigs

Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ class ControllerServer(
try {
this.logIdent = logContext.logPrefix()
info("Starting controller")

config.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None)

maybeChangeStatus(STARTING, STARTED)
Expand Down Expand Up @@ -322,11 +323,13 @@ class ControllerServer(
metadataPublishers.add(new DynamicConfigPublisher(
config,
sharedServer.metadataPublishingFaultHandler,
sharedServer.dynamicConfigFatalFaultHandler,
immutable.Map[ConfigType, ConfigHandler](
// controllers don't host topics, so no need to do anything with dynamic topic config changes here
ConfigType.BROKER -> new BrokerConfigHandler(config, quotaManagers)
),
"controller"))
"controller",
Some(sharedServer.invalidConfigMetrics)))

// Register this instance for dynamic config changes to the KafkaConfig. This must be called
// after the authorizer and quotaManagers are initialized, since it references those objects.
Expand Down
76 changes: 61 additions & 15 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.config.{DynamicConfigFailurePolicy, DynamicProducerStateManagerConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider}
Expand Down Expand Up @@ -89,6 +89,14 @@ import scala.jdk.CollectionConverters._
*/
object DynamicBrokerConfig {

/**
* Validation result for dynamic broker configuration
* @param invalidCount Number of invalid configurations
* @param invalidConfigNames Set of invalid configuration names
* @param perBrokerConfig True if these are per-broker configs, false if cluster-wide default configs
*/
case class ValidationResult(invalidCount: Int, invalidConfigNames: Set[String], perBrokerConfig: Boolean)

private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)

Expand Down Expand Up @@ -384,25 +392,39 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
telemetryExporterPluginOpt
}

private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties, doLog: Boolean = true): Option[DynamicBrokerConfig.ValidationResult] = CoreUtils.inWriteLock(lock) {
try {
val props = fromPersistentProps(persistentProps, perBrokerConfig = true)
val (props, validationResult) = fromPersistentProps(persistentProps, perBrokerConfig = true)
dynamicBrokerConfigs.clear()
dynamicBrokerConfigs ++= props.asScala
updateCurrentConfig(doLog)
validationResult
} catch {
case e: Exception => error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e)
case e: ConfigException if kafkaConfig.dynamicBrokerConfigFailurePolicy == DynamicConfigFailurePolicy.FAIL =>
// Re-throw ConfigException when failure policy is "fail" to halt the broker
error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e)
throw e
case e: Exception =>
error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e)
None
}
}

private[server] def updateDefaultConfig(persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
private[server] def updateDefaultConfig(persistentProps: Properties, doLog: Boolean = true): Option[DynamicBrokerConfig.ValidationResult] = CoreUtils.inWriteLock(lock) {
try {
val props = fromPersistentProps(persistentProps, perBrokerConfig = false)
val (props, validationResult) = fromPersistentProps(persistentProps, perBrokerConfig = false)
dynamicDefaultConfigs.clear()
dynamicDefaultConfigs ++= props.asScala
updateCurrentConfig(doLog)
validationResult
} catch {
case e: Exception => error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e)
case e: ConfigException if kafkaConfig.dynamicBrokerConfigFailurePolicy == DynamicConfigFailurePolicy.FAIL =>
// Re-throw ConfigException when failure policy is "fail" to halt the broker
error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e)
throw e
case e: Exception =>
error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e)
None
}
}

Expand Down Expand Up @@ -430,24 +452,46 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
}

private[server] def fromPersistentProps(persistentProps: Properties,
perBrokerConfig: Boolean): Properties = {
perBrokerConfig: Boolean): (Properties, Option[DynamicBrokerConfig.ValidationResult]) = {
val props = persistentProps.clone().asInstanceOf[Properties]

var invalidCount = 0
var invalidConfigNames = Set.empty[String]
// Remove all invalid configs from `props`
removeInvalidConfigs(props, perBrokerConfig)
def removeInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
val (invalidConfigsCount, invalidConfigsFromValidation) = removeInvalidConfigs(props, perBrokerConfig)
invalidCount += invalidConfigsCount
invalidConfigNames ++= invalidConfigsFromValidation
def removeInvalidProps(invalidPropNames: Set[String], errorMessage: String): Int = {
if (invalidPropNames.nonEmpty) {
invalidPropNames.foreach(props.remove)
invalidConfigNames ++= invalidPropNames
error(s"$errorMessage: $invalidPropNames")
}
invalidPropNames.size
}
removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be ignored")
removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
invalidCount += removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be ignored")
invalidCount += removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
"Security configs can be dynamically updated only using listener prefix, base configs will be ignored")
if (!perBrokerConfig)
removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")
invalidCount += removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")

props
// Create validation result if invalid configs were found
val validationResult = if (invalidCount > 0) {
Some(DynamicBrokerConfig.ValidationResult(invalidCount, invalidConfigNames.toSet, perBrokerConfig))
} else {
None
}

// Throw ConfigException if invalid configs detected with policy=fail
// DynamicConfigPublisher will decide whether to halt based on whether it's the first publish
if (invalidCount > 0 && kafkaConfig.dynamicBrokerConfigFailurePolicy == DynamicConfigFailurePolicy.FAIL) {
val configType = if (perBrokerConfig) "per-broker" else "cluster-wide"
val errorMsg = s"Invalid $configType dynamic configuration detected: ${invalidConfigNames.mkString(", ")}. " +
s"Broker is configured with dynamic.config.failure.policy=fail."
throw new ConfigException(errorMsg)
}

(props, validationResult)
}

/**
Expand Down Expand Up @@ -476,10 +520,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
processReconfiguration(newProps, validateOnly = true)
}

private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): (Int, Set[String]) = {
try {
validateConfigTypes(props)
props.asScala
(0, Set.empty)
} catch {
case e: Exception =>
val invalidProps = props.asScala.filter { case (k, v) =>
Expand All @@ -495,6 +540,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
invalidProps.keys.foreach(props.remove)
val configSource = if (perBrokerConfig) "broker" else "default cluster"
error(s"Dynamic $configSource config contains invalid values in: ${invalidProps.keys}, these configs will be ignored", e)
(invalidProps.size, invalidProps.keys.toSet)
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.AbstractKafkaConfig.getMap
import org.apache.kafka.server.config.{AbstractKafkaConfig, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{AbstractKafkaConfig, DynamicConfigFailurePolicy, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
Expand Down Expand Up @@ -405,6 +405,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val maxRequestPartitionSizeLimit = getInt(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG)

val deleteTopicEnable = getBoolean(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG)
val dynamicBrokerConfigFailurePolicy = DynamicConfigFailurePolicy.fromString(
getString(ServerConfigs.DYNAMIC_BROKER_CONFIG_FAILURE_POLICY_CONFIG))
def compressionType = getString(ServerConfigs.COMPRESSION_TYPE_CONFIG)

def gzipCompressionLevel = getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG)
Expand Down
17 changes: 16 additions & 1 deletion core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble
import org.apache.kafka.raft.Endpoints
import org.apache.kafka.server.{ProcessRole, ServerSocketFactory}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.DynamicConfigFailurePolicy
import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler}
import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics, NodeMetrics}
import org.apache.kafka.server.metrics.{BrokerServerMetrics, InvalidConfigMetrics, KafkaYammerMetrics, NodeMetrics}

import java.net.InetSocketAddress
import java.util.Arrays
Expand Down Expand Up @@ -121,6 +122,7 @@ class SharedServer(
@volatile var brokerMetrics: BrokerServerMetrics = _
@volatile var controllerServerMetrics: ControllerMetadataMetrics = _
@volatile var nodeMetrics: NodeMetrics = _
@volatile var invalidConfigMetrics: InvalidConfigMetrics = _
@volatile var loader: MetadataLoader = _
private val snapshotsDisabledReason = new AtomicReference[String](null)
@volatile var snapshotEmitter: SnapshotEmitter = _
Expand Down Expand Up @@ -262,6 +264,16 @@ class SharedServer(
// Note: snapshot generation does not need to be disabled for a publishing fault.
})

/**
* The fatal fault handler to use when invalid dynamic configurations are detected
* with dynamic.config.failure.policy=fail during broker startup.
*/
val dynamicConfigFatalFaultHandler: FaultHandler = faultHandlerFactory.build(
name = "dynamic config loading",
fatal = sharedServerConfig.dynamicBrokerConfigFailurePolicy == DynamicConfigFailurePolicy.FAIL,
action = () => { }
)

private def start(listenerEndpoints: Endpoints): Unit = synchronized {
if (started) {
debug("SharedServer has already been started.")
Expand All @@ -273,6 +285,9 @@ class SharedServer(
// This is only done in tests.
metrics = new Metrics()
}

// Create invalid config metrics for DynamicConfigPublisher (shared across all server instances)
invalidConfigMetrics = new InvalidConfigMetrics(metrics)
sharedServerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None)

if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
Expand Down
Loading
Loading