From 9242304c93ea070e78aa6b9ac62b9c72e9f22b43 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Tue, 23 Dec 2025 21:19:22 +0800 Subject: [PATCH 01/12] version 1 --- .../scala/kafka/server/BrokerServer.scala | 11 ++-- .../scala/kafka/server/ControllerServer.scala | 6 ++- .../kafka/server/DynamicBrokerConfig.scala | 54 ++++++++++++++++--- .../scala/kafka/server/SharedServer.scala | 6 ++- 4 files changed, 64 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 2d21ee59effa0..c1e694d0dfa04 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -41,7 +41,7 @@ import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService} import org.apache.kafka.coordinator.transaction.ProducerIdManager import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher} -import org.apache.kafka.metadata.{BrokerState, ListenerInfo, KRaftMetadataCache, MetadataCache, MetadataVersionConfigValidator} +import org.apache.kafka.metadata.{BrokerState, KRaftMetadataCache, ListenerInfo, MetadataCache, MetadataVersionConfigValidator} import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher} import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager} import org.apache.kafka.server.authorizer.Authorizer @@ -49,7 +49,7 @@ import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandl import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs} import org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig} -import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics} +import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaMetricsGroup, KafkaYammerMetrics} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager} import org.apache.kafka.server.share.session.ShareSessionCache @@ -193,7 +193,12 @@ class BrokerServer( val clientTelemetryExporterPlugin = new ClientTelemetryExporterPlugin() - config.dynamicConfig.initialize(Some(clientTelemetryExporterPlugin)) + // Create metrics group for DynamicBrokerConfig (shared across all KafkaConfig instances) + val dynamicConfigMetricsGroup = new KafkaMetricsGroup(Server.MetricsPrefix, "DynamicBrokerConfig") + config.dynamicConfig.initialize( + Some(clientTelemetryExporterPlugin), + Some(dynamicConfigMetricsGroup) + ) quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-", ProcessRole.BrokerRole.toString) DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager, config, quotaManagers, logContext) diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 81af28dc4958d..77bd77c1cb9d8 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -130,7 +130,11 @@ class ControllerServer( try { this.logIdent = logContext.logPrefix() info("Starting controller") - config.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None) + + config.dynamicConfig.initialize( + clientTelemetryExporterPluginOpt = None, + metricsGroupOpt = None + ) maybeChangeStatus(STARTING, STARTED) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index f0b077b17f1c8..6dde1ee7d82d5 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -46,7 +46,7 @@ import org.apache.kafka.server.{DynamicThreadPool, ProcessRole} import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig -import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs} +import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaMetricsGroup, MetricConfigs} import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider} import org.apache.kafka.snapshot.RecordsSnapshotReader import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig} @@ -254,11 +254,18 @@ object DynamicBrokerConfig { class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging { + warn(s"[DEBUG] ★★★ DynamicBrokerConfig instance created! Thread: ${Thread.currentThread().getName}") + private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() + // Metrics group and counters - will be set via initialize() + private var metricsGroupOpt: Option[KafkaMetricsGroup] = None + @volatile private var invalidBrokerConfigCount = 0 + @volatile private var invalidDefaultConfigCount = 0 + // Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these // collections, while another thread is iterating over them. private[server] val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]() @@ -267,9 +274,21 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private var telemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin] = _ private var currentConfig: KafkaConfig = _ - private[server] def initialize(clientTelemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin]): Unit = { + private[server] def initialize(clientTelemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin], + metricsGroupOpt: Option[KafkaMetricsGroup] = None): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false) telemetryExporterPluginOpt = clientTelemetryExporterPluginOpt + + // Initialize metrics group if provided (only register gauge once) + if (this.metricsGroupOpt.isEmpty) { + this.metricsGroupOpt = metricsGroupOpt + this.metricsGroupOpt.foreach { metricsGroup => + metricsGroup.newGauge("InvalidDynamicBrokerConfigCount", () => { + invalidBrokerConfigCount + invalidDefaultConfigCount + }) + warn(s"[DEBUG] ★★★ Metrics registered for DynamicBrokerConfig") + } + } } /** @@ -433,19 +452,26 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging perBrokerConfig: Boolean): Properties = { val props = persistentProps.clone().asInstanceOf[Properties] + var invalidCount = 0 // Remove all invalid configs from `props` - removeInvalidConfigs(props, perBrokerConfig) - def removeInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = { + val invalidConfigsFromValidation = removeInvalidConfigs(props, perBrokerConfig) + warn(s"[DEBUG] removeInvalidConfigs returned: $invalidConfigsFromValidation, perBrokerConfig=$perBrokerConfig") + invalidCount += invalidConfigsFromValidation + def removeInvalidProps(invalidPropNames: Set[String], errorMessage: String): Int = { if (invalidPropNames.nonEmpty) { invalidPropNames.foreach(props.remove) error(s"$errorMessage: $invalidPropNames") } + invalidPropNames.size } - removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be ignored") - removeInvalidProps(securityConfigsWithoutListenerPrefix(props), + invalidCount += removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs will be ignored") + invalidCount += removeInvalidProps(securityConfigsWithoutListenerPrefix(props), "Security configs can be dynamically updated only using listener prefix, base configs will be ignored") if (!perBrokerConfig) - removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored") + invalidCount += removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored") + warn(s"[DEBUG] About to call updateInvalidConfigCount with perBrokerConfig=$perBrokerConfig, invalidCount=$invalidCount") + updateInvalidConfigCount(perBrokerConfig, invalidCount) + warn(s"[DEBUG] After updateInvalidConfigCount, invalidBrokerConfigCount=$invalidBrokerConfigCount, invalidDefaultConfigCount=$invalidDefaultConfigCount") props } @@ -476,10 +502,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging processReconfiguration(newProps, validateOnly = true) } - private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Unit = { + private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Int = { try { validateConfigTypes(props) props.asScala + 0 } catch { case e: Exception => val invalidProps = props.asScala.filter { case (k, v) => @@ -495,6 +522,17 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging invalidProps.keys.foreach(props.remove) val configSource = if (perBrokerConfig) "broker" else "default cluster" error(s"Dynamic $configSource config contains invalid values in: ${invalidProps.keys}, these configs will be ignored", e) + invalidProps.size + } + } + + private def updateInvalidConfigCount(perBrokerConfig: Boolean, invalidCount: Int): Unit = { + if (perBrokerConfig) { + warn("update invalidBrokerConfigCount to " + invalidCount) + invalidBrokerConfigCount = invalidCount + } else { + warn("update invalidDefaultConfigCount to " + invalidCount) + invalidDefaultConfigCount = invalidCount } } diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 29a5a833b843c..c5332042bea83 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -273,7 +273,11 @@ class SharedServer( // This is only done in tests. metrics = new Metrics() } - sharedServerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None) + + sharedServerConfig.dynamicConfig.initialize( + clientTelemetryExporterPluginOpt = None, + metricsGroupOpt = None + ) if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) { brokerMetrics = new BrokerServerMetrics(metrics) From dde5bae0ecd931352dd8046021eca47b5e602879 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 24 Dec 2025 11:30:46 +0800 Subject: [PATCH 02/12] Use static variable to record --- .../scala/kafka/server/BrokerServer.scala | 6 ++--- .../kafka/server/DynamicBrokerConfig.scala | 22 ++++++++++--------- .../scala/kafka/server/SharedServer.scala | 5 +++-- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index c1e694d0dfa04..a46ded311a49b 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -49,7 +49,7 @@ import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandl import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs} import org.apache.kafka.server.log.remote.metadata.storage.BrokerReadyCallback import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig} -import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaMetricsGroup, KafkaYammerMetrics} +import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics} import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo} import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager} import org.apache.kafka.server.share.session.ShareSessionCache @@ -193,11 +193,9 @@ class BrokerServer( val clientTelemetryExporterPlugin = new ClientTelemetryExporterPlugin() - // Create metrics group for DynamicBrokerConfig (shared across all KafkaConfig instances) - val dynamicConfigMetricsGroup = new KafkaMetricsGroup(Server.MetricsPrefix, "DynamicBrokerConfig") config.dynamicConfig.initialize( Some(clientTelemetryExporterPlugin), - Some(dynamicConfigMetricsGroup) + None ) quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-", ProcessRole.BrokerRole.toString) DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager, config, quotaManagers, logContext) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 6dde1ee7d82d5..d97c589050501 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -89,6 +89,10 @@ import scala.jdk.CollectionConverters._ */ object DynamicBrokerConfig { + // Shared counters across all DynamicBrokerConfig instances (to survive KafkaConfig recreation) + @volatile private var sharedInvalidBrokerConfigCount = 0 + @volatile private var sharedInvalidDefaultConfigCount = 0 + private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG) @@ -261,10 +265,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - // Metrics group and counters - will be set via initialize() private var metricsGroupOpt: Option[KafkaMetricsGroup] = None - @volatile private var invalidBrokerConfigCount = 0 - @volatile private var invalidDefaultConfigCount = 0 // Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these // collections, while another thread is iterating over them. @@ -283,10 +284,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging if (this.metricsGroupOpt.isEmpty) { this.metricsGroupOpt = metricsGroupOpt this.metricsGroupOpt.foreach { metricsGroup => + val stackTrace = Thread.currentThread().getStackTrace.take(15).mkString("\n ") + warn(s"[DEBUG] ★★★ Metrics registered for DynamicBrokerConfig\nStack trace:\n $stackTrace") metricsGroup.newGauge("InvalidDynamicBrokerConfigCount", () => { - invalidBrokerConfigCount + invalidDefaultConfigCount + DynamicBrokerConfig.sharedInvalidBrokerConfigCount + DynamicBrokerConfig.sharedInvalidDefaultConfigCount }) - warn(s"[DEBUG] ★★★ Metrics registered for DynamicBrokerConfig") } } } @@ -471,7 +473,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging invalidCount += removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored") warn(s"[DEBUG] About to call updateInvalidConfigCount with perBrokerConfig=$perBrokerConfig, invalidCount=$invalidCount") updateInvalidConfigCount(perBrokerConfig, invalidCount) - warn(s"[DEBUG] After updateInvalidConfigCount, invalidBrokerConfigCount=$invalidBrokerConfigCount, invalidDefaultConfigCount=$invalidDefaultConfigCount") + warn(s"[DEBUG] After updateInvalidConfigCount, sharedInvalidBrokerConfigCount=${DynamicBrokerConfig.sharedInvalidBrokerConfigCount}, sharedInvalidDefaultConfigCount=${DynamicBrokerConfig.sharedInvalidDefaultConfigCount}") props } @@ -528,11 +530,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private def updateInvalidConfigCount(perBrokerConfig: Boolean, invalidCount: Int): Unit = { if (perBrokerConfig) { - warn("update invalidBrokerConfigCount to " + invalidCount) - invalidBrokerConfigCount = invalidCount + warn("update sharedInvalidBrokerConfigCount to " + invalidCount) + DynamicBrokerConfig.sharedInvalidBrokerConfigCount = invalidCount } else { - warn("update invalidDefaultConfigCount to " + invalidCount) - invalidDefaultConfigCount = invalidCount + warn("update sharedInvalidDefaultConfigCount to " + invalidCount) + DynamicBrokerConfig.sharedInvalidDefaultConfigCount = invalidCount } } diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index c5332042bea83..b2fd60cd1e14d 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -37,7 +37,7 @@ import org.apache.kafka.raft.Endpoints import org.apache.kafka.server.{ProcessRole, ServerSocketFactory} import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler} -import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics, NodeMetrics} +import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaMetricsGroup, KafkaYammerMetrics, NodeMetrics} import java.net.InetSocketAddress import java.util.Arrays @@ -274,9 +274,10 @@ class SharedServer( metrics = new Metrics() } + val dynamicConfigMetricsGroup = new KafkaMetricsGroup(Server.MetricsPrefix, "DynamicBrokerConfig") sharedServerConfig.dynamicConfig.initialize( clientTelemetryExporterPluginOpt = None, - metricsGroupOpt = None + metricsGroupOpt = Some(dynamicConfigMetricsGroup) ) if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) { From d23d46e62c28fbdba38db71d685548e8dced5005 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 24 Dec 2025 19:20:45 +0800 Subject: [PATCH 03/12] use additional metrics --- .../scala/kafka/server/ControllerServer.scala | 2 +- .../kafka/server/DynamicBrokerConfig.scala | 29 +++-- .../scala/kafka/server/SharedServer.scala | 15 ++- .../server/metrics/InvalidConfigMetrics.java | 122 ++++++++++++++++++ 4 files changed, 150 insertions(+), 18 deletions(-) create mode 100644 server/src/main/java/org/apache/kafka/server/metrics/InvalidConfigMetrics.java diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 77bd77c1cb9d8..ad836803f663e 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -133,7 +133,7 @@ class ControllerServer( config.dynamicConfig.initialize( clientTelemetryExporterPluginOpt = None, - metricsGroupOpt = None + invalidConfigMetricsOpt = None ) maybeChangeStatus(STARTING, STARTED) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index d97c589050501..2e459fd89b4ee 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -46,7 +46,7 @@ import org.apache.kafka.server.{DynamicThreadPool, ProcessRole} import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig -import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaMetricsGroup, MetricConfigs} +import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs} import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider} import org.apache.kafka.snapshot.RecordsSnapshotReader import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig} @@ -258,14 +258,16 @@ object DynamicBrokerConfig { class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging { - warn(s"[DEBUG] ★★★ DynamicBrokerConfig instance created! Thread: ${Thread.currentThread().getName}") + val stackTrace = Thread.currentThread().getStackTrace.take(15).mkString("\n ") + warn(s"[DEBUG] ★★★ DynamicBrokerConfig instance created! Thread: ${Thread.currentThread().getName}\nCall stack:\n $stackTrace") private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - private var metricsGroupOpt: Option[KafkaMetricsGroup] = None + // Invalid config metrics - will be set via initialize() + private var invalidConfigMetricsOpt: Option[org.apache.kafka.server.metrics.InvalidConfigMetrics] = None // Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these // collections, while another thread is iterating over them. @@ -276,19 +278,16 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private var currentConfig: KafkaConfig = _ private[server] def initialize(clientTelemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin], - metricsGroupOpt: Option[KafkaMetricsGroup] = None): Unit = { + invalidConfigMetricsOpt: Option[org.apache.kafka.server.metrics.InvalidConfigMetrics] = None): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false) telemetryExporterPluginOpt = clientTelemetryExporterPluginOpt - // Initialize metrics group if provided (only register gauge once) - if (this.metricsGroupOpt.isEmpty) { - this.metricsGroupOpt = metricsGroupOpt - this.metricsGroupOpt.foreach { metricsGroup => + // Initialize invalid config metrics if provided (only once) + if (this.invalidConfigMetricsOpt.isEmpty) { + this.invalidConfigMetricsOpt = invalidConfigMetricsOpt + invalidConfigMetricsOpt.foreach { metrics => val stackTrace = Thread.currentThread().getStackTrace.take(15).mkString("\n ") - warn(s"[DEBUG] ★★★ Metrics registered for DynamicBrokerConfig\nStack trace:\n $stackTrace") - metricsGroup.newGauge("InvalidDynamicBrokerConfigCount", () => { - DynamicBrokerConfig.sharedInvalidBrokerConfigCount + DynamicBrokerConfig.sharedInvalidDefaultConfigCount - }) + warn(s"[DEBUG] ★★★ Invalid config metrics registered for DynamicBrokerConfig\nStack trace:\n $stackTrace") } } } @@ -530,11 +529,13 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private def updateInvalidConfigCount(perBrokerConfig: Boolean, invalidCount: Int): Unit = { if (perBrokerConfig) { - warn("update sharedInvalidBrokerConfigCount to " + invalidCount) + warn("update invalidBrokerConfigCount to " + invalidCount) DynamicBrokerConfig.sharedInvalidBrokerConfigCount = invalidCount + invalidConfigMetricsOpt.foreach(_.setInvalidBrokerConfigCount(invalidCount)) } else { - warn("update sharedInvalidDefaultConfigCount to " + invalidCount) + warn("update invalidDefaultConfigCount to " + invalidCount) DynamicBrokerConfig.sharedInvalidDefaultConfigCount = invalidCount + invalidConfigMetricsOpt.foreach(_.setInvalidDefaultConfigCount(invalidCount)) } } diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index b2fd60cd1e14d..ccdecb471f7af 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -37,7 +37,7 @@ import org.apache.kafka.raft.Endpoints import org.apache.kafka.server.{ProcessRole, ServerSocketFactory} import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler} -import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaMetricsGroup, KafkaYammerMetrics, NodeMetrics} +import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics, NodeMetrics} import java.net.InetSocketAddress import java.util.Arrays @@ -274,10 +274,19 @@ class SharedServer( metrics = new Metrics() } - val dynamicConfigMetricsGroup = new KafkaMetricsGroup(Server.MetricsPrefix, "DynamicBrokerConfig") + // Create invalid config metrics for DynamicBrokerConfig (shared across all KafkaConfig instances) + val invalidConfigMetrics = new org.apache.kafka.server.metrics.InvalidConfigMetrics(metrics) sharedServerConfig.dynamicConfig.initialize( clientTelemetryExporterPluginOpt = None, - metricsGroupOpt = Some(dynamicConfigMetricsGroup) + invalidConfigMetricsOpt = Some(invalidConfigMetrics) + ) + brokerConfig.dynamicConfig.initialize( + clientTelemetryExporterPluginOpt = None, + invalidConfigMetricsOpt = Some(invalidConfigMetrics) + ) + controllerConfig.dynamicConfig.initialize( + clientTelemetryExporterPluginOpt = None, + invalidConfigMetricsOpt = Some(invalidConfigMetrics) ) if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) { diff --git a/server/src/main/java/org/apache/kafka/server/metrics/InvalidConfigMetrics.java b/server/src/main/java/org/apache/kafka/server/metrics/InvalidConfigMetrics.java new file mode 100644 index 0000000000000..a6ab43f6bf749 --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/metrics/InvalidConfigMetrics.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.Metrics; + +import java.util.Collections; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tracks invalid dynamic broker configuration counts. + * This class provides metrics for monitoring invalid configurations that are + * rejected during dynamic broker configuration updates. + */ +public final class InvalidConfigMetrics implements AutoCloseable { + private static final String METRIC_GROUP_NAME = "dynamic-broker-config"; + private static final String INVALID_BROKER_CONFIG_COUNT = "invalid-broker-config-count"; + private static final String INVALID_DEFAULT_CONFIG_COUNT = "invalid-default-config-count"; + private static final String TOTAL_INVALID_CONFIG_COUNT = "total-invalid-config-count"; + + private final Metrics metrics; + private final AtomicInteger invalidBrokerConfigCount = new AtomicInteger(0); + private final AtomicInteger invalidDefaultConfigCount = new AtomicInteger(0); + + public InvalidConfigMetrics(Metrics metrics) { + this.metrics = metrics; + registerMetrics(); + } + + private void registerMetrics() { + // Register invalid broker config count metric + metrics.addMetric( + metricName(INVALID_BROKER_CONFIG_COUNT), + (Gauge) (config, now) -> invalidBrokerConfigCount.get() + ); + + // Register invalid default config count metric + metrics.addMetric( + metricName(INVALID_DEFAULT_CONFIG_COUNT), + (Gauge) (config, now) -> invalidDefaultConfigCount.get() + ); + + // Register total invalid config count metric + metrics.addMetric( + metricName(TOTAL_INVALID_CONFIG_COUNT), + (Gauge) (config, now) -> invalidBrokerConfigCount.get() + invalidDefaultConfigCount.get() + ); + } + + private MetricName metricName(String name) { + return metrics.metricName(name, METRIC_GROUP_NAME, Collections.emptyMap()); + } + + /** + * Update the invalid broker config count. + * + * @param count the new count of invalid broker configurations + */ + public void setInvalidBrokerConfigCount(int count) { + invalidBrokerConfigCount.set(count); + } + + /** + * Update the invalid default config count. + * + * @param count the new count of invalid default configurations + */ + public void setInvalidDefaultConfigCount(int count) { + invalidDefaultConfigCount.set(count); + } + + /** + * Get the current invalid broker config count. + * + * @return the count of invalid broker configurations + */ + public int getInvalidBrokerConfigCount() { + return invalidBrokerConfigCount.get(); + } + + /** + * Get the current invalid default config count. + * + * @return the count of invalid default configurations + */ + public int getInvalidDefaultConfigCount() { + return invalidDefaultConfigCount.get(); + } + + /** + * Get the total invalid config count. + * + * @return the total count of all invalid configurations + */ + public int getTotalInvalidConfigCount() { + return invalidBrokerConfigCount.get() + invalidDefaultConfigCount.get(); + } + + @Override + public void close() { + metrics.removeMetric(metricName(INVALID_BROKER_CONFIG_COUNT)); + metrics.removeMetric(metricName(INVALID_DEFAULT_CONFIG_COUNT)); + metrics.removeMetric(metricName(TOTAL_INVALID_CONFIG_COUNT)); + } +} From 9e47aa0340d04f3a5b424d2da7b74979c0c5fb28 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Wed, 24 Dec 2025 19:27:55 +0800 Subject: [PATCH 04/12] appear invalid config name --- .../kafka/server/DynamicBrokerConfig.scala | 30 ++++++----- .../server/metrics/InvalidConfigMetrics.java | 51 +++++++++++++++++-- 2 files changed, 63 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 2e459fd89b4ee..e0569ffb0c6d2 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -454,13 +454,16 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging val props = persistentProps.clone().asInstanceOf[Properties] var invalidCount = 0 + var invalidConfigNames = Set.empty[String] // Remove all invalid configs from `props` - val invalidConfigsFromValidation = removeInvalidConfigs(props, perBrokerConfig) - warn(s"[DEBUG] removeInvalidConfigs returned: $invalidConfigsFromValidation, perBrokerConfig=$perBrokerConfig") - invalidCount += invalidConfigsFromValidation + val (invalidConfigsCount, invalidConfigsFromValidation) = removeInvalidConfigs(props, perBrokerConfig) + warn(s"[DEBUG] removeInvalidConfigs returned: count=$invalidConfigsCount, names=$invalidConfigsFromValidation, perBrokerConfig=$perBrokerConfig") + invalidCount += invalidConfigsCount + invalidConfigNames ++= invalidConfigsFromValidation def removeInvalidProps(invalidPropNames: Set[String], errorMessage: String): Int = { if (invalidPropNames.nonEmpty) { invalidPropNames.foreach(props.remove) + invalidConfigNames ++= invalidPropNames error(s"$errorMessage: $invalidPropNames") } invalidPropNames.size @@ -470,8 +473,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging "Security configs can be dynamically updated only using listener prefix, base configs will be ignored") if (!perBrokerConfig) invalidCount += removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored") - warn(s"[DEBUG] About to call updateInvalidConfigCount with perBrokerConfig=$perBrokerConfig, invalidCount=$invalidCount") - updateInvalidConfigCount(perBrokerConfig, invalidCount) + warn(s"[DEBUG] About to call updateInvalidConfigCount with perBrokerConfig=$perBrokerConfig, invalidCount=$invalidCount, invalidConfigNames=$invalidConfigNames") + updateInvalidConfigCount(perBrokerConfig, invalidCount, invalidConfigNames) warn(s"[DEBUG] After updateInvalidConfigCount, sharedInvalidBrokerConfigCount=${DynamicBrokerConfig.sharedInvalidBrokerConfigCount}, sharedInvalidDefaultConfigCount=${DynamicBrokerConfig.sharedInvalidDefaultConfigCount}") props @@ -503,11 +506,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging processReconfiguration(newProps, validateOnly = true) } - private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Int = { + private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): (Int, Set[String]) = { try { validateConfigTypes(props) props.asScala - 0 + (0, Set.empty) } catch { case e: Exception => val invalidProps = props.asScala.filter { case (k, v) => @@ -523,19 +526,20 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging invalidProps.keys.foreach(props.remove) val configSource = if (perBrokerConfig) "broker" else "default cluster" error(s"Dynamic $configSource config contains invalid values in: ${invalidProps.keys}, these configs will be ignored", e) - invalidProps.size + (invalidProps.size, invalidProps.keys.toSet) } } - private def updateInvalidConfigCount(perBrokerConfig: Boolean, invalidCount: Int): Unit = { + private def updateInvalidConfigCount(perBrokerConfig: Boolean, invalidCount: Int, invalidConfigNames: Set[String]): Unit = { + val configNamesStr = invalidConfigNames.mkString(", ") if (perBrokerConfig) { - warn("update invalidBrokerConfigCount to " + invalidCount) + warn(s"update invalidBrokerConfigCount to $invalidCount, invalid configs: [$configNamesStr]") DynamicBrokerConfig.sharedInvalidBrokerConfigCount = invalidCount - invalidConfigMetricsOpt.foreach(_.setInvalidBrokerConfigCount(invalidCount)) + invalidConfigMetricsOpt.foreach(_.setInvalidBrokerConfigCount(invalidCount, configNamesStr)) } else { - warn("update invalidDefaultConfigCount to " + invalidCount) + warn(s"update invalidDefaultConfigCount to $invalidCount, invalid configs: [$configNamesStr]") DynamicBrokerConfig.sharedInvalidDefaultConfigCount = invalidCount - invalidConfigMetricsOpt.foreach(_.setInvalidDefaultConfigCount(invalidCount)) + invalidConfigMetricsOpt.foreach(_.setInvalidDefaultConfigCount(invalidCount, configNamesStr)) } } diff --git a/server/src/main/java/org/apache/kafka/server/metrics/InvalidConfigMetrics.java b/server/src/main/java/org/apache/kafka/server/metrics/InvalidConfigMetrics.java index a6ab43f6bf749..5ef95ce738f2a 100644 --- a/server/src/main/java/org/apache/kafka/server/metrics/InvalidConfigMetrics.java +++ b/server/src/main/java/org/apache/kafka/server/metrics/InvalidConfigMetrics.java @@ -23,9 +23,10 @@ import java.util.Collections; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** - * Tracks invalid dynamic broker configuration counts. + * Tracks invalid dynamic broker configuration counts and names. * This class provides metrics for monitoring invalid configurations that are * rejected during dynamic broker configuration updates. */ @@ -34,10 +35,14 @@ public final class InvalidConfigMetrics implements AutoCloseable { private static final String INVALID_BROKER_CONFIG_COUNT = "invalid-broker-config-count"; private static final String INVALID_DEFAULT_CONFIG_COUNT = "invalid-default-config-count"; private static final String TOTAL_INVALID_CONFIG_COUNT = "total-invalid-config-count"; + private static final String INVALID_BROKER_CONFIG_NAMES = "invalid-broker-config-names"; + private static final String INVALID_DEFAULT_CONFIG_NAMES = "invalid-default-config-names"; private final Metrics metrics; private final AtomicInteger invalidBrokerConfigCount = new AtomicInteger(0); private final AtomicInteger invalidDefaultConfigCount = new AtomicInteger(0); + private final AtomicReference invalidBrokerConfigNames = new AtomicReference<>(""); + private final AtomicReference invalidDefaultConfigNames = new AtomicReference<>(""); public InvalidConfigMetrics(Metrics metrics) { this.metrics = metrics; @@ -62,6 +67,18 @@ private void registerMetrics() { metricName(TOTAL_INVALID_CONFIG_COUNT), (Gauge) (config, now) -> invalidBrokerConfigCount.get() + invalidDefaultConfigCount.get() ); + + // Register invalid broker config names metric + metrics.addMetric( + metricName(INVALID_BROKER_CONFIG_NAMES), + (Gauge) (config, now) -> invalidBrokerConfigNames.get() + ); + + // Register invalid default config names metric + metrics.addMetric( + metricName(INVALID_DEFAULT_CONFIG_NAMES), + (Gauge) (config, now) -> invalidDefaultConfigNames.get() + ); } private MetricName metricName(String name) { @@ -69,21 +86,43 @@ private MetricName metricName(String name) { } /** - * Update the invalid broker config count. + * Update the invalid broker config count and names. * * @param count the new count of invalid broker configurations + * @param configNames comma-separated list of invalid config names */ - public void setInvalidBrokerConfigCount(int count) { + public void setInvalidBrokerConfigCount(int count, String configNames) { invalidBrokerConfigCount.set(count); + invalidBrokerConfigNames.set(configNames != null ? configNames : ""); + } + + /** + * Update the invalid broker config count (backward compatibility). + * + * @param count the new count of invalid broker configurations + */ + public void setInvalidBrokerConfigCount(int count) { + setInvalidBrokerConfigCount(count, ""); } /** - * Update the invalid default config count. + * Update the invalid default config count and names. * * @param count the new count of invalid default configurations + * @param configNames comma-separated list of invalid config names */ - public void setInvalidDefaultConfigCount(int count) { + public void setInvalidDefaultConfigCount(int count, String configNames) { invalidDefaultConfigCount.set(count); + invalidDefaultConfigNames.set(configNames != null ? configNames : ""); + } + + /** + * Update the invalid default config count (backward compatibility). + * + * @param count the new count of invalid default configurations + */ + public void setInvalidDefaultConfigCount(int count) { + setInvalidDefaultConfigCount(count, ""); } /** @@ -118,5 +157,7 @@ public void close() { metrics.removeMetric(metricName(INVALID_BROKER_CONFIG_COUNT)); metrics.removeMetric(metricName(INVALID_DEFAULT_CONFIG_COUNT)); metrics.removeMetric(metricName(TOTAL_INVALID_CONFIG_COUNT)); + metrics.removeMetric(metricName(INVALID_BROKER_CONFIG_NAMES)); + metrics.removeMetric(metricName(INVALID_DEFAULT_CONFIG_NAMES)); } } From 209cf650d34f2fdafc02858e3466b16bf7b5d183 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 25 Dec 2025 00:54:42 +0800 Subject: [PATCH 05/12] add fast fail policy; --- .../kafka/server/DynamicBrokerConfig.scala | 32 +++++++++++++------ .../main/scala/kafka/server/KafkaConfig.scala | 1 + .../kafka/server/config/ServerConfigs.java | 8 +++++ 3 files changed, 32 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index e0569ffb0c6d2..a81f77edaae79 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -258,9 +258,6 @@ object DynamicBrokerConfig { class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging { - val stackTrace = Thread.currentThread().getStackTrace.take(15).mkString("\n ") - warn(s"[DEBUG] ★★★ DynamicBrokerConfig instance created! Thread: ${Thread.currentThread().getName}\nCall stack:\n $stackTrace") - private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala private val dynamicBrokerConfigs = mutable.Map[String, String]() @@ -285,10 +282,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // Initialize invalid config metrics if provided (only once) if (this.invalidConfigMetricsOpt.isEmpty) { this.invalidConfigMetricsOpt = invalidConfigMetricsOpt - invalidConfigMetricsOpt.foreach { metrics => - val stackTrace = Thread.currentThread().getStackTrace.take(15).mkString("\n ") - warn(s"[DEBUG] ★★★ Invalid config metrics registered for DynamicBrokerConfig\nStack trace:\n $stackTrace") - } } } @@ -411,7 +404,12 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging dynamicBrokerConfigs ++= props.asScala updateCurrentConfig(doLog) } catch { - case e: Exception => error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e) + case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => + // Re-throw ConfigException when failure policy is "fail" to halt the broker + error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e) + throw e + case e: Exception => + error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e) } } @@ -422,7 +420,12 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging dynamicDefaultConfigs ++= props.asScala updateCurrentConfig(doLog) } catch { - case e: Exception => error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e) + case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => + // Re-throw ConfigException when failure policy is "fail" to halt the broker + error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e) + throw e + case e: Exception => + error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e) } } @@ -477,6 +480,17 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging updateInvalidConfigCount(perBrokerConfig, invalidCount, invalidConfigNames) warn(s"[DEBUG] After updateInvalidConfigCount, sharedInvalidBrokerConfigCount=${DynamicBrokerConfig.sharedInvalidBrokerConfigCount}, sharedInvalidDefaultConfigCount=${DynamicBrokerConfig.sharedInvalidDefaultConfigCount}") + // Check failure policy and fail fast if configured + if (invalidCount > 0 && kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail")) { + val configType = if (perBrokerConfig) "per-broker" else "cluster-wide" + val errorMsg = s"Invalid $configType dynamic configuration detected: $invalidConfigNames. " + + s"Broker is configured with dynamic.config.failure.policy=fail, halting the broker. " + + s"To allow the broker to start with warnings instead, set dynamic.config.failure.policy=warn in server.properties." + fatal(errorMsg) + // Throw exception first, then exit if it gets caught + throw new ConfigException(errorMsg) + } + props } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c469f53bdda23..0c4eecd4e572c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -405,6 +405,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val maxRequestPartitionSizeLimit = getInt(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG) val deleteTopicEnable = getBoolean(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG) + val dynamicConfigFailurePolicy = getString(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG) def compressionType = getString(ServerConfigs.COMPRESSION_TYPE_CONFIG) def gzipCompressionLevel = getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG) diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index e40c5c8e6d36f..1a6247e241071 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -76,6 +76,13 @@ public class ServerConfigs { public static final String DELETE_TOPIC_ENABLE_DOC = "When set to true, topics can be deleted by the admin client. " + "When set to false, deletion requests will be explicitly rejected by the broker."; + /** ********* Dynamic Configuration Failure Policy ***********/ + public static final String DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG = "dynamic.config.failure.policy"; + public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DEFAULT = "fail"; + public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DOC = "The policy to apply when dynamic configuration validation fails. " + + "Valid values are 'warn' (log warning and ignore invalid configs) and 'fail' (halt the broker). " + + "This setting helps ensure configuration consistency and prevents silent configuration errors."; + public static final String COMPRESSION_TYPE_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.COMPRESSION_TYPE_CONFIG); public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + "('gzip', 'snappy', 'lz4', 'zstd'). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + @@ -143,6 +150,7 @@ public class ServerConfigs { /** ********* Controlled shutdown configuration ***********/ .define(CONTROLLED_SHUTDOWN_ENABLE_CONFIG, BOOLEAN, CONTROLLED_SHUTDOWN_ENABLE_DEFAULT, MEDIUM, CONTROLLED_SHUTDOWN_ENABLE_DOC) .define(DELETE_TOPIC_ENABLE_CONFIG, BOOLEAN, DELETE_TOPIC_ENABLE_DEFAULT, HIGH, DELETE_TOPIC_ENABLE_DOC) + .define(DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG, STRING, DYNAMIC_CONFIG_FAILURE_POLICY_DEFAULT, ConfigDef.ValidString.in("warn", "fail"), MEDIUM, DYNAMIC_CONFIG_FAILURE_POLICY_DOC) .define(COMPRESSION_TYPE_CONFIG, STRING, ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, ConfigDef.ValidString.in(BrokerCompressionType.names().toArray(new String[0])), HIGH, COMPRESSION_TYPE_DOC) .define(COMPRESSION_GZIP_LEVEL_CONFIG, INT, CompressionType.GZIP.defaultLevel(), CompressionType.GZIP.levelValidator(), MEDIUM, COMPRESSION_GZIP_LEVEL_DOC) .define(COMPRESSION_LZ4_LEVEL_CONFIG, INT, CompressionType.LZ4.defaultLevel(), CompressionType.LZ4.levelValidator(), MEDIUM, COMPRESSION_LZ4_LEVEL_DOC) From 0bcfe097cb28e8ca5e20e8f20b6fcd7d8df8c9fb Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 25 Dec 2025 15:04:29 +0800 Subject: [PATCH 06/12] add a node to cluster aspect --- .../scala/kafka/server/BrokerServer.scala | 1 + .../scala/kafka/server/ControllerServer.scala | 1 + .../scala/kafka/server/SharedServer.scala | 10 ++ .../metadata/DynamicConfigPublisher.scala | 17 +++ .../DynamicBrokerReconfigurationTest.scala | 119 ++++++++++++++++++ .../BrokerMetadataPublisherTest.scala | 1 + .../kafka/server/config/ServerConfigs.java | 2 +- 7 files changed, 150 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index a46ded311a49b..ebb494390c20e 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -497,6 +497,7 @@ class BrokerServer( new DynamicConfigPublisher( config, sharedServer.metadataPublishingFaultHandler, + sharedServer.dynamicConfigFatalFaultHandler, dynamicConfigHandlers.toMap, "broker"), new DynamicClientQuotaPublisher( diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index ad836803f663e..00b7f725fc33c 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -326,6 +326,7 @@ class ControllerServer( metadataPublishers.add(new DynamicConfigPublisher( config, sharedServer.metadataPublishingFaultHandler, + sharedServer.dynamicConfigFatalFaultHandler, immutable.Map[ConfigType, ConfigHandler]( // controllers don't host topics, so no need to do anything with dynamic topic config changes here ConfigType.BROKER -> new BrokerConfigHandler(config, quotaManagers) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index ccdecb471f7af..308a3fe587029 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -262,6 +262,16 @@ class SharedServer( // Note: snapshot generation does not need to be disabled for a publishing fault. }) + /** + * The fatal fault handler to use when invalid dynamic configurations are detected + * with dynamic.config.failure.policy=fail. This handler triggers a graceful shutdown. + */ + val dynamicConfigFatalFaultHandler: FaultHandler = faultHandlerFactory.build( + name = "dynamic config", + fatal = sharedServerConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail"), + action = () => { } + ) + private def start(listenerEndpoints: Endpoints): Unit = synchronized { if (started) { debug("SharedServer has already been started.") diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala index d30d5dd246745..b51099b898e7e 100644 --- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala @@ -31,6 +31,7 @@ import org.apache.kafka.server.fault.FaultHandler class DynamicConfigPublisher( conf: KafkaConfig, faultHandler: FaultHandler, + fatalFaultHandler: FaultHandler, dynamicConfigHandlers: Map[ConfigType, ConfigHandler], nodeType: String, ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { @@ -80,6 +81,14 @@ class DynamicConfigPublisher( toLoggableProps(resource, props).mkString(",")) nodeConfigHandler.processConfigChanges(resource.name(), props) } catch { + case e: org.apache.kafka.common.config.ConfigException + if conf.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => + // When failure policy is "fail", use fatal fault handler for graceful shutdown + throw fatalFaultHandler.handleFault( + s"Error updating cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + + s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail, initiating graceful shutdown.", + e + ) case t: Throwable => faultHandler.handleFault("Error updating " + s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + s"in $deltaName", t) @@ -96,6 +105,14 @@ class DynamicConfigPublisher( // have changed. This doesn't apply to topic configs or cluster configs. reloadUpdatedFilesWithoutConfigChange(props) } catch { + case e: org.apache.kafka.common.config.ConfigException + if conf.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => + // When failure policy is "fail", use fatal fault handler for graceful shutdown + throw fatalFaultHandler.handleFault( + s"Error updating node ${conf.nodeId} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + + s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail, initiating graceful shutdown.", + e + ) case t: Throwable => faultHandler.handleFault("Error updating " + s"node with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + s"in $deltaName", t) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 2b738cfde6313..a2ec198fdb1aa 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -57,6 +57,7 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.MetadataLogConfig +import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs} import org.apache.kafka.server.ReplicaState @@ -1150,6 +1151,124 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup reporterAfterRestart.verifyState(reconfigureCount = 0, numFetcher = 2) } + @ParameterizedTest + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testDynamicConfigFailurePolicyWarn(groupProtocol: String): Unit = { + // Test that with policy=warn, invalid configs are logged but broker continues + val props = defaultStaticConfig(numServers) + props.put(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG, "warn") + + val kafkaConfig = KafkaConfig.fromProps(props) + val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer] + servers += newBroker + + // Try to set an invalid cluster-wide config (log.segment.bytes is not allowed as dynamic cluster config) + val invalidProps = new Properties() + invalidProps.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1024") + + // This should succeed (config is ignored with warning) when policy=warn + val alterResult = alterConfigs(Seq(newBroker), adminClients.head, invalidProps, perBrokerConfig = false) + alterResult.all().get() + + // Verify broker is still running + assertTrue(newBroker.brokerState == BrokerState.RUNNING) + + // Verify invalid config count metric + val invalidDefaultConfigCount = newBroker.metrics.metrics().asScala + .find(_._1.name() == "invalid-default-config-count") + .map(_._2.metricValue().asInstanceOf[Int]) + assertTrue(invalidDefaultConfigCount.isDefined) + assertTrue(invalidDefaultConfigCount.get > 0, s"Expected invalid config count > 0, got ${invalidDefaultConfigCount.get}") + } + + @ParameterizedTest + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testDynamicConfigFailurePolicyFail(groupProtocol: String): Unit = { + // Test that with policy=fail, invalid configs cause broker to halt + val props = defaultStaticConfig(numServers) + props.put(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG, "fail") + + val kafkaConfig = KafkaConfig.fromProps(props) + val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer] + servers += newBroker + + // Try to set an invalid cluster-wide config + val invalidProps = new Properties() + invalidProps.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1024") + + // This should fail when policy=fail + // Note: In the actual implementation, this would cause System.exit(1) + // For testing purposes, we verify the exception is thrown + val alterResult = alterConfigs(Seq(newBroker), adminClients.head, invalidProps, perBrokerConfig = false) + + // The alter operation itself succeeds, but the broker will shut down when applying the config + alterResult.all().get() + + // Wait for broker to shut down due to invalid config + TestUtils.waitUntilTrue( + () => newBroker.brokerState == BrokerState.SHUTTING_DOWN, + "Broker did not shut down after invalid config with policy=fail", + 30000L + ) + } + + @ParameterizedTest + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testInvalidConfigMetrics(groupProtocol: String): Unit = { + // Test that invalid config metrics are correctly updated + val props = defaultStaticConfig(numServers) + props.put(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG, "warn") + + val kafkaConfig = KafkaConfig.fromProps(props) + val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer] + servers += newBroker + + // Initially, no invalid configs + val initialMetrics = newBroker.metrics.metrics().asScala + val initialInvalidCount = initialMetrics + .find(_._1.name() == "invalid-default-config-count") + .map(_._2.metricValue().asInstanceOf[Int]) + .getOrElse(0) + assertEquals(0, initialInvalidCount, "Expected no invalid configs initially") + + // Set multiple invalid cluster-wide configs + val invalidProps = new Properties() + invalidProps.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1024") + invalidProps.put("broker.id", "999") // broker.id is not dynamic + invalidProps.put("node.id", "888") // node.id is not dynamic + + alterConfigs(Seq(newBroker), adminClients.head, invalidProps, perBrokerConfig = false).all().get() + + // Wait for metrics to update + TestUtils.waitUntilTrue( + () => { + val metrics = newBroker.metrics.metrics().asScala + val count = metrics + .find(_._1.name() == "invalid-default-config-count") + .map(_._2.metricValue().asInstanceOf[Int]) + .getOrElse(0) + count > 0 + }, + "Invalid config count metric did not update", + 10000L + ) + + // Verify invalid config names are tracked + val metricsAfter = newBroker.metrics.metrics().asScala + val invalidConfigNames = metricsAfter + .find(_._1.name() == "invalid-default-config-names") + .map(_._2.metricValue().asInstanceOf[String]) + assertTrue(invalidConfigNames.isDefined) + assertFalse(invalidConfigNames.get.isEmpty, "Expected invalid config names to be recorded") + + // Verify total count + val totalInvalidCount = metricsAfter + .find(_._1.name() == "total-invalid-config-count") + .map(_._2.metricValue().asInstanceOf[Int]) + .getOrElse(0) + assertTrue(totalInvalidCount > 0, s"Expected total invalid count > 0, got $totalInvalidCount") + } + private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = { TestUtils.pollUntilTrue(consumer, () => !consumer.assignment.isEmpty, "Timed out while waiting for assignment") consumer.assignment.forEach(tp => consumer.position(tp)) diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index dc3b987488fcc..3c9731ccb177c 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -101,6 +101,7 @@ class BrokerMetadataPublisherTest { Mockito.spy(new DynamicConfigPublisher( conf = broker.config, faultHandler = errorHandler, + fatalFaultHandler = errorHandler, // Use same handler for tests dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap, nodeType = "broker")) } diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 1a6247e241071..0acfae7982ba3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -80,7 +80,7 @@ public class ServerConfigs { public static final String DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG = "dynamic.config.failure.policy"; public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DEFAULT = "fail"; public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DOC = "The policy to apply when dynamic configuration validation fails. " + - "Valid values are 'warn' (log warning and ignore invalid configs) and 'fail' (halt the broker). " + + "Valid values are 'warn' (log warning and ignore invalid configs, default for backward compatibility) and 'fail' (halt the broker). " + "This setting helps ensure configuration consistency and prevents silent configuration errors."; public static final String COMPRESSION_TYPE_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.COMPRESSION_TYPE_CONFIG); From 197d247381d2de72e921c2ba693483e8742e67ae Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 25 Dec 2025 16:34:38 +0800 Subject: [PATCH 07/12] apply change to all dynamic config publisher --- .../kafka/server/DynamicBrokerConfig.scala | 26 +++++----- .../scala/kafka/server/SharedServer.scala | 2 +- .../metadata/DynamicConfigPublisher.scala | 49 ++++++++++++++++--- 3 files changed, 56 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index a81f77edaae79..9438d8e3926cb 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -90,8 +90,8 @@ import scala.jdk.CollectionConverters._ object DynamicBrokerConfig { // Shared counters across all DynamicBrokerConfig instances (to survive KafkaConfig recreation) - @volatile private var sharedInvalidBrokerConfigCount = 0 - @volatile private var sharedInvalidDefaultConfigCount = 0 + @volatile private[server] var sharedInvalidBrokerConfigCount = 0 + @volatile private[server] var sharedInvalidDefaultConfigCount = 0 private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG) @@ -263,6 +263,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() + /** + * True if the dynamic config failure policy is set to "fail" + */ + private val isPolicyFail: Boolean = kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") + // Invalid config metrics - will be set via initialize() private var invalidConfigMetricsOpt: Option[org.apache.kafka.server.metrics.InvalidConfigMetrics] = None @@ -404,7 +409,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging dynamicBrokerConfigs ++= props.asScala updateCurrentConfig(doLog) } catch { - case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => + case e: ConfigException if isPolicyFail => // Re-throw ConfigException when failure policy is "fail" to halt the broker error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e) throw e @@ -420,7 +425,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging dynamicDefaultConfigs ++= props.asScala updateCurrentConfig(doLog) } catch { - case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => + case e: ConfigException if isPolicyFail => // Re-throw ConfigException when failure policy is "fail" to halt the broker error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e) throw e @@ -460,7 +465,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging var invalidConfigNames = Set.empty[String] // Remove all invalid configs from `props` val (invalidConfigsCount, invalidConfigsFromValidation) = removeInvalidConfigs(props, perBrokerConfig) - warn(s"[DEBUG] removeInvalidConfigs returned: count=$invalidConfigsCount, names=$invalidConfigsFromValidation, perBrokerConfig=$perBrokerConfig") invalidCount += invalidConfigsCount invalidConfigNames ++= invalidConfigsFromValidation def removeInvalidProps(invalidPropNames: Set[String], errorMessage: String): Int = { @@ -476,18 +480,14 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging "Security configs can be dynamically updated only using listener prefix, base configs will be ignored") if (!perBrokerConfig) invalidCount += removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored") - warn(s"[DEBUG] About to call updateInvalidConfigCount with perBrokerConfig=$perBrokerConfig, invalidCount=$invalidCount, invalidConfigNames=$invalidConfigNames") updateInvalidConfigCount(perBrokerConfig, invalidCount, invalidConfigNames) - warn(s"[DEBUG] After updateInvalidConfigCount, sharedInvalidBrokerConfigCount=${DynamicBrokerConfig.sharedInvalidBrokerConfigCount}, sharedInvalidDefaultConfigCount=${DynamicBrokerConfig.sharedInvalidDefaultConfigCount}") - // Check failure policy and fail fast if configured - if (invalidCount > 0 && kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail")) { + // Throw ConfigException if invalid configs detected with policy=fail + // DynamicConfigPublisher will decide whether to halt based on whether it's the first publish + if (invalidCount > 0 && isPolicyFail) { val configType = if (perBrokerConfig) "per-broker" else "cluster-wide" val errorMsg = s"Invalid $configType dynamic configuration detected: $invalidConfigNames. " + - s"Broker is configured with dynamic.config.failure.policy=fail, halting the broker. " + - s"To allow the broker to start with warnings instead, set dynamic.config.failure.policy=warn in server.properties." - fatal(errorMsg) - // Throw exception first, then exit if it gets caught + s"Broker is configured with dynamic.config.failure.policy=fail." throw new ConfigException(errorMsg) } diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 308a3fe587029..7d2d04a07530b 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -264,7 +264,7 @@ class SharedServer( /** * The fatal fault handler to use when invalid dynamic configurations are detected - * with dynamic.config.failure.policy=fail. This handler triggers a graceful shutdown. + * with dynamic.config.failure.policy=fail during broker startup. */ val dynamicConfigFatalFaultHandler: FaultHandler = faultHandlerFactory.build( name = "dynamic config", diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala index b51099b898e7e..9ff508f482564 100644 --- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala @@ -21,6 +21,7 @@ import java.util.Properties import kafka.server.ConfigAdminManager.toLoggableProps import kafka.server.{ConfigHandler, KafkaConfig} import kafka.utils.Logging +import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC} import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.{MetadataDelta, MetadataImage} @@ -37,6 +38,17 @@ class DynamicConfigPublisher( ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { logIdent = s"[${name()}] " + /** + * True if this is the first time we are publishing dynamic configuration. + * We only enforce dynamic.config.failure.policy=fail during the first publish (startup). + */ + @volatile private var _firstPublish = true + + /** + * True if the dynamic config failure policy is set to "fail" + */ + private val isPolicyFail: Boolean = conf.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") + override def name(): String = s"DynamicConfigPublisher $nodeType id=${conf.nodeId}" override def onMetadataUpdate( @@ -66,6 +78,13 @@ class DynamicConfigPublisher( toLoggableProps(resource, props).mkString(",")) topicConfigHandler.processConfigChanges(resource.name(), props) } catch { + case e: ConfigException if _firstPublish && isPolicyFail => + // During first publish (startup), if failure policy is "fail", use fatal fault handler + throw fatalFaultHandler.handleFault( + s"Error updating topic ${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + + s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail.", + e + ) case t: Throwable => faultHandler.handleFault("Error updating topic " + s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + s"in $deltaName", t) @@ -82,11 +101,11 @@ class DynamicConfigPublisher( nodeConfigHandler.processConfigChanges(resource.name(), props) } catch { case e: org.apache.kafka.common.config.ConfigException - if conf.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => - // When failure policy is "fail", use fatal fault handler for graceful shutdown + if _firstPublish && isPolicyFail => + // During first publish (startup), if failure policy is "fail", throw fatal fault handler throw fatalFaultHandler.handleFault( s"Error updating cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail, initiating graceful shutdown.", + s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail.", e ) case t: Throwable => faultHandler.handleFault("Error updating " + @@ -105,12 +124,12 @@ class DynamicConfigPublisher( // have changed. This doesn't apply to topic configs or cluster configs. reloadUpdatedFilesWithoutConfigChange(props) } catch { - case e: org.apache.kafka.common.config.ConfigException - if conf.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => - // When failure policy is "fail", use fatal fault handler for graceful shutdown + case e: ConfigException + if _firstPublish && isPolicyFail => + // During first publish (startup), if failure policy is "fail", use fatal fault handler throw fatalFaultHandler.handleFault( s"Error updating node ${conf.nodeId} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail, initiating graceful shutdown.", + s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail.", e ) case t: Throwable => faultHandler.handleFault("Error updating " + @@ -127,6 +146,13 @@ class DynamicConfigPublisher( toLoggableProps(resource, props).mkString(",")) metricsConfigHandler.processConfigChanges(resource.name(), props) } catch { + case e: ConfigException if _firstPublish && isPolicyFail => + // During first publish (startup), if failure policy is "fail", use fatal fault handler + throw fatalFaultHandler.handleFault( + s"Error updating client metrics ${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + + s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail.", + e + ) case t: Throwable => faultHandler.handleFault("Error updating client metrics" + s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + s"in $deltaName", t) @@ -139,6 +165,13 @@ class DynamicConfigPublisher( toLoggableProps(resource, props).mkString(",")) groupConfigHandler.processConfigChanges(resource.name(), props) } catch { + case e: ConfigException if _firstPublish && isPolicyFail => + // During first publish (startup), if failure policy is "fail", use fatal fault handler + throw fatalFaultHandler.handleFault( + s"Error updating group ${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + + s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail.", + e + ) case t: Throwable => faultHandler.handleFault("Error updating group " + s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + s"in $deltaName", t) @@ -150,6 +183,8 @@ class DynamicConfigPublisher( } catch { case t: Throwable => faultHandler.handleFault("Uncaught exception while " + s"publishing dynamic configuration changes from $deltaName", t) + } finally { + _firstPublish = false } } From d9676be9accd85dd3839430b4fd2f856db3824ef Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Thu, 25 Dec 2025 17:09:11 +0800 Subject: [PATCH 08/12] fix init error --- .../scala/kafka/server/DynamicBrokerConfig.scala | 11 +++-------- .../server/DynamicBrokerReconfigurationTest.scala | 12 ++++++------ .../apache/kafka/server/config/ServerConfigs.java | 2 +- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 9438d8e3926cb..a60bb7bdd391c 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -263,11 +263,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - /** - * True if the dynamic config failure policy is set to "fail" - */ - private val isPolicyFail: Boolean = kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") - // Invalid config metrics - will be set via initialize() private var invalidConfigMetricsOpt: Option[org.apache.kafka.server.metrics.InvalidConfigMetrics] = None @@ -409,7 +404,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging dynamicBrokerConfigs ++= props.asScala updateCurrentConfig(doLog) } catch { - case e: ConfigException if isPolicyFail => + case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => // Re-throw ConfigException when failure policy is "fail" to halt the broker error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e) throw e @@ -425,7 +420,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging dynamicDefaultConfigs ++= props.asScala updateCurrentConfig(doLog) } catch { - case e: ConfigException if isPolicyFail => + case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => // Re-throw ConfigException when failure policy is "fail" to halt the broker error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e) throw e @@ -484,7 +479,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // Throw ConfigException if invalid configs detected with policy=fail // DynamicConfigPublisher will decide whether to halt based on whether it's the first publish - if (invalidCount > 0 && isPolicyFail) { + if (invalidCount > 0 && kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail")) { val configType = if (perBrokerConfig) "per-broker" else "cluster-wide" val errorMsg = s"Invalid $configType dynamic configuration detected: $invalidConfigNames. " + s"Broker is configured with dynamic.config.failure.policy=fail." diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index a2ec198fdb1aa..061a138c54ef4 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -1151,8 +1151,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup reporterAfterRestart.verifyState(reconfigureCount = 0, numFetcher = 2) } - @ParameterizedTest - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) def testDynamicConfigFailurePolicyWarn(groupProtocol: String): Unit = { // Test that with policy=warn, invalid configs are logged but broker continues val props = defaultStaticConfig(numServers) @@ -1181,8 +1181,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup assertTrue(invalidDefaultConfigCount.get > 0, s"Expected invalid config count > 0, got ${invalidDefaultConfigCount.get}") } - @ParameterizedTest - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) def testDynamicConfigFailurePolicyFail(groupProtocol: String): Unit = { // Test that with policy=fail, invalid configs cause broker to halt val props = defaultStaticConfig(numServers) @@ -1212,8 +1212,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup ) } - @ParameterizedTest - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) def testInvalidConfigMetrics(groupProtocol: String): Unit = { // Test that invalid config metrics are correctly updated val props = defaultStaticConfig(numServers) diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 0acfae7982ba3..f2860cce47934 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -78,7 +78,7 @@ public class ServerConfigs { /** ********* Dynamic Configuration Failure Policy ***********/ public static final String DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG = "dynamic.config.failure.policy"; - public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DEFAULT = "fail"; + public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DEFAULT = "warn"; public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DOC = "The policy to apply when dynamic configuration validation fails. " + "Valid values are 'warn' (log warning and ignore invalid configs, default for backward compatibility) and 'fail' (halt the broker). " + "This setting helps ensure configuration consistency and prevents silent configuration errors."; From c862283f1878cea6ff0956bca8a411b5f11f411f Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sun, 28 Dec 2025 13:05:35 +0800 Subject: [PATCH 09/12] test fail --- .../main/java/org/apache/kafka/server/config/ServerConfigs.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index f2860cce47934..0acfae7982ba3 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -78,7 +78,7 @@ public class ServerConfigs { /** ********* Dynamic Configuration Failure Policy ***********/ public static final String DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG = "dynamic.config.failure.policy"; - public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DEFAULT = "warn"; + public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DEFAULT = "fail"; public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DOC = "The policy to apply when dynamic configuration validation fails. " + "Valid values are 'warn' (log warning and ignore invalid configs, default for backward compatibility) and 'fail' (halt the broker). " + "This setting helps ensure configuration consistency and prevents silent configuration errors."; From 4743493ed2aad0f90fa45784b16c564e018d836b Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sun, 28 Dec 2025 14:03:19 +0800 Subject: [PATCH 10/12] move metrics to dynamicPublisher --- .../scala/kafka/server/BrokerServer.scala | 8 +- .../kafka/server/ConfigAdminManager.scala | 2 +- .../scala/kafka/server/ConfigHandler.scala | 10 +- .../scala/kafka/server/ControllerServer.scala | 8 +- .../kafka/server/DynamicBrokerConfig.scala | 60 ++++----- .../scala/kafka/server/SharedServer.scala | 20 +-- .../metadata/DynamicConfigPublisher.scala | 39 +++++- .../DynamicBrokerReconfigurationTest.scala | 119 ------------------ 8 files changed, 82 insertions(+), 184 deletions(-) diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index ebb494390c20e..d81affb507c4a 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -193,10 +193,7 @@ class BrokerServer( val clientTelemetryExporterPlugin = new ClientTelemetryExporterPlugin() - config.dynamicConfig.initialize( - Some(clientTelemetryExporterPlugin), - None - ) + config.dynamicConfig.initialize(Some(clientTelemetryExporterPlugin)) quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-", ProcessRole.BrokerRole.toString) DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager, config, quotaManagers, logContext) @@ -499,7 +496,8 @@ class BrokerServer( sharedServer.metadataPublishingFaultHandler, sharedServer.dynamicConfigFatalFaultHandler, dynamicConfigHandlers.toMap, - "broker"), + "broker", + Some(sharedServer.invalidConfigMetrics)), new DynamicClientQuotaPublisher( config.nodeId, sharedServer.metadataPublishingFaultHandler, diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala index 7394d2cfc43c6..25e764795658a 100644 --- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala +++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala @@ -165,7 +165,7 @@ class ConfigAdminManager(nodeId: Int, ): Unit = { val perBrokerConfig = configResource.name().nonEmpty val persistentProps = configRepository.config(configResource) - val configProps = conf.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig) + val (configProps, _) = conf.dynamicConfig.fromPersistentProps(persistentProps, perBrokerConfig) val alterConfigOps = resource.configs().asScala.map { config => val opType = AlterConfigOp.OpType.forId(config.configOperation()) diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 3e82db290925e..9024491678639 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -145,12 +145,20 @@ class TopicConfigHandler(private val replicaManager: ReplicaManager, */ class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging { + + @volatile private var _lastValidationResult: Option[DynamicBrokerConfig.ValidationResult] = None + + def lastValidationResult: Option[DynamicBrokerConfig.ValidationResult] = _lastValidationResult + def processConfigChanges(brokerId: String, properties: Properties): Unit = { - if (brokerId.isEmpty) + val validationResult = if (brokerId.isEmpty) brokerConfig.dynamicConfig.updateDefaultConfig(properties) else if (brokerConfig.brokerId == brokerId.trim.toInt) { brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties) + } else { + None } + _lastValidationResult = validationResult val updatedDynamicBrokerConfigs = brokerConfig.dynamicConfig.currentDynamicBrokerConfigs val updatedDynamicDefaultConfigs = brokerConfig.dynamicConfig.currentDynamicDefaultConfigs diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 00b7f725fc33c..e5870123896aa 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -131,10 +131,7 @@ class ControllerServer( this.logIdent = logContext.logPrefix() info("Starting controller") - config.dynamicConfig.initialize( - clientTelemetryExporterPluginOpt = None, - invalidConfigMetricsOpt = None - ) + config.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None) maybeChangeStatus(STARTING, STARTED) @@ -331,7 +328,8 @@ class ControllerServer( // controllers don't host topics, so no need to do anything with dynamic topic config changes here ConfigType.BROKER -> new BrokerConfigHandler(config, quotaManagers) ), - "controller")) + "controller", + Some(sharedServer.invalidConfigMetrics))) // Register this instance for dynamic config changes to the KafkaConfig. This must be called // after the authorizer and quotaManagers are initialized, since it references those objects. diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index a60bb7bdd391c..18a3b68d66936 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -89,9 +89,13 @@ import scala.jdk.CollectionConverters._ */ object DynamicBrokerConfig { - // Shared counters across all DynamicBrokerConfig instances (to survive KafkaConfig recreation) - @volatile private[server] var sharedInvalidBrokerConfigCount = 0 - @volatile private[server] var sharedInvalidDefaultConfigCount = 0 + /** + * Validation result for dynamic broker configuration + * @param invalidCount Number of invalid configurations + * @param invalidConfigNames Set of invalid configuration names + * @param perBrokerConfig True if these are per-broker configs, false if cluster-wide default configs + */ + case class ValidationResult(invalidCount: Int, invalidConfigNames: Set[String], perBrokerConfig: Boolean) private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG) @@ -263,9 +267,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - // Invalid config metrics - will be set via initialize() - private var invalidConfigMetricsOpt: Option[org.apache.kafka.server.metrics.InvalidConfigMetrics] = None - // Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these // collections, while another thread is iterating over them. private[server] val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]() @@ -274,15 +275,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private var telemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin] = _ private var currentConfig: KafkaConfig = _ - private[server] def initialize(clientTelemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin], - invalidConfigMetricsOpt: Option[org.apache.kafka.server.metrics.InvalidConfigMetrics] = None): Unit = { + private[server] def initialize(clientTelemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin]): Unit = { currentConfig = new KafkaConfig(kafkaConfig.props, false) telemetryExporterPluginOpt = clientTelemetryExporterPluginOpt - - // Initialize invalid config metrics if provided (only once) - if (this.invalidConfigMetricsOpt.isEmpty) { - this.invalidConfigMetricsOpt = invalidConfigMetricsOpt - } } /** @@ -397,12 +392,13 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging telemetryExporterPluginOpt } - private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) { + private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties, doLog: Boolean = true): Option[DynamicBrokerConfig.ValidationResult] = CoreUtils.inWriteLock(lock) { try { - val props = fromPersistentProps(persistentProps, perBrokerConfig = true) + val (props, validationResult) = fromPersistentProps(persistentProps, perBrokerConfig = true) dynamicBrokerConfigs.clear() dynamicBrokerConfigs ++= props.asScala updateCurrentConfig(doLog) + validationResult } catch { case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => // Re-throw ConfigException when failure policy is "fail" to halt the broker @@ -410,15 +406,17 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging throw e case e: Exception => error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e) + None } } - private[server] def updateDefaultConfig(persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) { + private[server] def updateDefaultConfig(persistentProps: Properties, doLog: Boolean = true): Option[DynamicBrokerConfig.ValidationResult] = CoreUtils.inWriteLock(lock) { try { - val props = fromPersistentProps(persistentProps, perBrokerConfig = false) + val (props, validationResult) = fromPersistentProps(persistentProps, perBrokerConfig = false) dynamicDefaultConfigs.clear() dynamicDefaultConfigs ++= props.asScala updateCurrentConfig(doLog) + validationResult } catch { case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => // Re-throw ConfigException when failure policy is "fail" to halt the broker @@ -426,6 +424,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging throw e case e: Exception => error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e) + None } } @@ -453,7 +452,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } private[server] def fromPersistentProps(persistentProps: Properties, - perBrokerConfig: Boolean): Properties = { + perBrokerConfig: Boolean): (Properties, Option[DynamicBrokerConfig.ValidationResult]) = { val props = persistentProps.clone().asInstanceOf[Properties] var invalidCount = 0 @@ -475,18 +474,24 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging "Security configs can be dynamically updated only using listener prefix, base configs will be ignored") if (!perBrokerConfig) invalidCount += removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored") - updateInvalidConfigCount(perBrokerConfig, invalidCount, invalidConfigNames) + + // Create validation result if invalid configs were found + val validationResult = if (invalidCount > 0) { + Some(DynamicBrokerConfig.ValidationResult(invalidCount, invalidConfigNames.toSet, perBrokerConfig)) + } else { + None + } // Throw ConfigException if invalid configs detected with policy=fail // DynamicConfigPublisher will decide whether to halt based on whether it's the first publish if (invalidCount > 0 && kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail")) { val configType = if (perBrokerConfig) "per-broker" else "cluster-wide" - val errorMsg = s"Invalid $configType dynamic configuration detected: $invalidConfigNames. " + + val errorMsg = s"Invalid $configType dynamic configuration detected: ${invalidConfigNames.mkString(", ")}. " + s"Broker is configured with dynamic.config.failure.policy=fail." throw new ConfigException(errorMsg) } - props + (props, validationResult) } /** @@ -539,19 +544,6 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } } - private def updateInvalidConfigCount(perBrokerConfig: Boolean, invalidCount: Int, invalidConfigNames: Set[String]): Unit = { - val configNamesStr = invalidConfigNames.mkString(", ") - if (perBrokerConfig) { - warn(s"update invalidBrokerConfigCount to $invalidCount, invalid configs: [$configNamesStr]") - DynamicBrokerConfig.sharedInvalidBrokerConfigCount = invalidCount - invalidConfigMetricsOpt.foreach(_.setInvalidBrokerConfigCount(invalidCount, configNamesStr)) - } else { - warn(s"update invalidDefaultConfigCount to $invalidCount, invalid configs: [$configNamesStr]") - DynamicBrokerConfig.sharedInvalidDefaultConfigCount = invalidCount - invalidConfigMetricsOpt.foreach(_.setInvalidDefaultConfigCount(invalidCount, configNamesStr)) - } - } - private[server] def maybeReconfigure(reconfigurable: Reconfigurable, oldConfig: KafkaConfig, newConfig: util.Map[String, _]): Unit = { if (reconfigurable.reconfigurableConfigs.asScala.exists(key => oldConfig.originals.get(key) != newConfig.get(key))) reconfigurable.reconfigure(newConfig) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 7d2d04a07530b..3c3c6256859e4 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -121,6 +121,7 @@ class SharedServer( @volatile var brokerMetrics: BrokerServerMetrics = _ @volatile var controllerServerMetrics: ControllerMetadataMetrics = _ @volatile var nodeMetrics: NodeMetrics = _ + @volatile var invalidConfigMetrics: org.apache.kafka.server.metrics.InvalidConfigMetrics = _ @volatile var loader: MetadataLoader = _ private val snapshotsDisabledReason = new AtomicReference[String](null) @volatile var snapshotEmitter: SnapshotEmitter = _ @@ -284,20 +285,11 @@ class SharedServer( metrics = new Metrics() } - // Create invalid config metrics for DynamicBrokerConfig (shared across all KafkaConfig instances) - val invalidConfigMetrics = new org.apache.kafka.server.metrics.InvalidConfigMetrics(metrics) - sharedServerConfig.dynamicConfig.initialize( - clientTelemetryExporterPluginOpt = None, - invalidConfigMetricsOpt = Some(invalidConfigMetrics) - ) - brokerConfig.dynamicConfig.initialize( - clientTelemetryExporterPluginOpt = None, - invalidConfigMetricsOpt = Some(invalidConfigMetrics) - ) - controllerConfig.dynamicConfig.initialize( - clientTelemetryExporterPluginOpt = None, - invalidConfigMetricsOpt = Some(invalidConfigMetrics) - ) + // Create invalid config metrics for DynamicConfigPublisher (shared across all server instances) + invalidConfigMetrics = new org.apache.kafka.server.metrics.InvalidConfigMetrics(metrics) + sharedServerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None) + brokerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None) + controllerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None) if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) { brokerMetrics = new BrokerServerMetrics(metrics) diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala index 9ff508f482564..94aa767ce0e9b 100644 --- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala @@ -19,7 +19,7 @@ package kafka.server.metadata import java.util.Properties import kafka.server.ConfigAdminManager.toLoggableProps -import kafka.server.{ConfigHandler, KafkaConfig} +import kafka.server.{BrokerConfigHandler, ConfigHandler, DynamicBrokerConfig, KafkaConfig} import kafka.utils.Logging import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC} @@ -27,6 +27,7 @@ import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.{MetadataDelta, MetadataImage} import org.apache.kafka.server.config.ConfigType import org.apache.kafka.server.fault.FaultHandler +import org.apache.kafka.server.metrics.InvalidConfigMetrics class DynamicConfigPublisher( @@ -35,6 +36,7 @@ class DynamicConfigPublisher( fatalFaultHandler: FaultHandler, dynamicConfigHandlers: Map[ConfigType, ConfigHandler], nodeType: String, + invalidConfigMetrics: Option[InvalidConfigMetrics] = None, ) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher { logIdent = s"[${name()}] " @@ -49,6 +51,21 @@ class DynamicConfigPublisher( */ private val isPolicyFail: Boolean = conf.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") + /** + * Update metrics from validation result + */ + private def updateMetrics(validationResult: DynamicBrokerConfig.ValidationResult): Unit = { + invalidConfigMetrics.foreach { metrics => + if (validationResult.perBrokerConfig) { + warn(s"Updating invalidBrokerConfigCount to ${validationResult.invalidCount}, invalid configs: [${validationResult.invalidConfigNames.mkString(", ")}]") + metrics.setInvalidBrokerConfigCount(validationResult.invalidCount, validationResult.invalidConfigNames.mkString(", ")) + } else { + warn(s"Updating invalidDefaultConfigCount to ${validationResult.invalidCount}, invalid configs: [${validationResult.invalidConfigNames.mkString(", ")}]") + metrics.setInvalidDefaultConfigCount(validationResult.invalidCount, validationResult.invalidConfigNames.mkString(", ")) + } + } + } + override def name(): String = s"DynamicConfigPublisher $nodeType id=${conf.nodeId}" override def onMetadataUpdate( @@ -99,9 +116,15 @@ class DynamicConfigPublisher( info("Updating cluster configuration : " + toLoggableProps(resource, props).mkString(",")) nodeConfigHandler.processConfigChanges(resource.name(), props) + + // Update metrics if there were validation issues (only for BrokerConfigHandler) + nodeConfigHandler match { + case brokerHandler: BrokerConfigHandler => + brokerHandler.lastValidationResult.foreach(updateMetrics) + case _ => // other config handlers don't have validation results + } } catch { - case e: org.apache.kafka.common.config.ConfigException - if _firstPublish && isPolicyFail => + case e: ConfigException if _firstPublish && isPolicyFail => // During first publish (startup), if failure policy is "fail", throw fatal fault handler throw fatalFaultHandler.handleFault( s"Error updating cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + @@ -123,9 +146,15 @@ class DynamicConfigPublisher( // set to /tmp/foo, we still want to reload /tmp/foo in case its contents // have changed. This doesn't apply to topic configs or cluster configs. reloadUpdatedFilesWithoutConfigChange(props) + + // Update metrics if there were validation issues (only for BrokerConfigHandler) + nodeConfigHandler match { + case brokerHandler: BrokerConfigHandler => + brokerHandler.lastValidationResult.foreach(updateMetrics) + case _ => // other config handlers don't have validation results + } } catch { - case e: ConfigException - if _firstPublish && isPolicyFail => + case e: ConfigException if _firstPublish && isPolicyFail => // During first publish (startup), if failure policy is "fail", use fatal fault handler throw fatalFaultHandler.handleFault( s"Error updating node ${conf.nodeId} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 061a138c54ef4..2b738cfde6313 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -57,7 +57,6 @@ import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializ import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.MetadataLogConfig -import org.apache.kafka.metadata.BrokerState import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs} import org.apache.kafka.server.ReplicaState @@ -1151,124 +1150,6 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup reporterAfterRestart.verifyState(reconfigureCount = 0, numFetcher = 2) } - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testDynamicConfigFailurePolicyWarn(groupProtocol: String): Unit = { - // Test that with policy=warn, invalid configs are logged but broker continues - val props = defaultStaticConfig(numServers) - props.put(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG, "warn") - - val kafkaConfig = KafkaConfig.fromProps(props) - val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer] - servers += newBroker - - // Try to set an invalid cluster-wide config (log.segment.bytes is not allowed as dynamic cluster config) - val invalidProps = new Properties() - invalidProps.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1024") - - // This should succeed (config is ignored with warning) when policy=warn - val alterResult = alterConfigs(Seq(newBroker), adminClients.head, invalidProps, perBrokerConfig = false) - alterResult.all().get() - - // Verify broker is still running - assertTrue(newBroker.brokerState == BrokerState.RUNNING) - - // Verify invalid config count metric - val invalidDefaultConfigCount = newBroker.metrics.metrics().asScala - .find(_._1.name() == "invalid-default-config-count") - .map(_._2.metricValue().asInstanceOf[Int]) - assertTrue(invalidDefaultConfigCount.isDefined) - assertTrue(invalidDefaultConfigCount.get > 0, s"Expected invalid config count > 0, got ${invalidDefaultConfigCount.get}") - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testDynamicConfigFailurePolicyFail(groupProtocol: String): Unit = { - // Test that with policy=fail, invalid configs cause broker to halt - val props = defaultStaticConfig(numServers) - props.put(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG, "fail") - - val kafkaConfig = KafkaConfig.fromProps(props) - val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer] - servers += newBroker - - // Try to set an invalid cluster-wide config - val invalidProps = new Properties() - invalidProps.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1024") - - // This should fail when policy=fail - // Note: In the actual implementation, this would cause System.exit(1) - // For testing purposes, we verify the exception is thrown - val alterResult = alterConfigs(Seq(newBroker), adminClients.head, invalidProps, perBrokerConfig = false) - - // The alter operation itself succeeds, but the broker will shut down when applying the config - alterResult.all().get() - - // Wait for broker to shut down due to invalid config - TestUtils.waitUntilTrue( - () => newBroker.brokerState == BrokerState.SHUTTING_DOWN, - "Broker did not shut down after invalid config with policy=fail", - 30000L - ) - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) - @MethodSource(Array("getTestGroupProtocolParametersAll")) - def testInvalidConfigMetrics(groupProtocol: String): Unit = { - // Test that invalid config metrics are correctly updated - val props = defaultStaticConfig(numServers) - props.put(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG, "warn") - - val kafkaConfig = KafkaConfig.fromProps(props) - val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer] - servers += newBroker - - // Initially, no invalid configs - val initialMetrics = newBroker.metrics.metrics().asScala - val initialInvalidCount = initialMetrics - .find(_._1.name() == "invalid-default-config-count") - .map(_._2.metricValue().asInstanceOf[Int]) - .getOrElse(0) - assertEquals(0, initialInvalidCount, "Expected no invalid configs initially") - - // Set multiple invalid cluster-wide configs - val invalidProps = new Properties() - invalidProps.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1024") - invalidProps.put("broker.id", "999") // broker.id is not dynamic - invalidProps.put("node.id", "888") // node.id is not dynamic - - alterConfigs(Seq(newBroker), adminClients.head, invalidProps, perBrokerConfig = false).all().get() - - // Wait for metrics to update - TestUtils.waitUntilTrue( - () => { - val metrics = newBroker.metrics.metrics().asScala - val count = metrics - .find(_._1.name() == "invalid-default-config-count") - .map(_._2.metricValue().asInstanceOf[Int]) - .getOrElse(0) - count > 0 - }, - "Invalid config count metric did not update", - 10000L - ) - - // Verify invalid config names are tracked - val metricsAfter = newBroker.metrics.metrics().asScala - val invalidConfigNames = metricsAfter - .find(_._1.name() == "invalid-default-config-names") - .map(_._2.metricValue().asInstanceOf[String]) - assertTrue(invalidConfigNames.isDefined) - assertFalse(invalidConfigNames.get.isEmpty, "Expected invalid config names to be recorded") - - // Verify total count - val totalInvalidCount = metricsAfter - .find(_._1.name() == "total-invalid-config-count") - .map(_._2.metricValue().asInstanceOf[Int]) - .getOrElse(0) - assertTrue(totalInvalidCount > 0, s"Expected total invalid count > 0, got $totalInvalidCount") - } - private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = { TestUtils.pollUntilTrue(consumer, () => !consumer.assignment.isEmpty, "Timed out while waiting for assignment") consumer.assignment.forEach(tp => consumer.position(tp)) From 3cedf70cd80a99fa6a763ea9c05f022ab97ac34b Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Sun, 28 Dec 2025 21:19:55 +0800 Subject: [PATCH 11/12] use emnu --- config/server.properties | 5 +- .../kafka/server/DynamicBrokerConfig.scala | 6 +- .../main/scala/kafka/server/KafkaConfig.scala | 3 +- .../scala/kafka/server/SharedServer.scala | 2 +- .../metadata/DynamicConfigPublisher.scala | 2 +- .../config/DynamicConfigFailurePolicy.java | 74 +++++++++++++++++++ 6 files changed, 84 insertions(+), 8 deletions(-) create mode 100644 server-common/src/main/java/org/apache/kafka/server/config/DynamicConfigFailurePolicy.java diff --git a/config/server.properties b/config/server.properties index 7f1773d354ea1..ff19c74b40d93 100644 --- a/config/server.properties +++ b/config/server.properties @@ -22,7 +22,8 @@ process.roles=broker,controller node.id=1 # List of controller endpoints used connect to the controller cluster -controller.quorum.bootstrap.servers=localhost:9093 +controller.quorum.bootstrap.servers=192.168.31.242:17684 +dynamic.config.failure.policy=fail ############################# Socket Server Settings ############################# @@ -41,7 +42,7 @@ inter.broker.listener.name=PLAINTEXT # Listener name, hostname and port the broker or the controller will advertise to clients. # If not set, it uses the value for "listeners". -advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 +advertised.listeners=PLAINTEXT://192.168.31.242:9092,CONTROLLER://192.168.31.242:9093 # A comma-separated list of the names of the listeners used by the controller. # If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 18a3b68d66936..f938b3b721711 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -400,7 +400,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging updateCurrentConfig(doLog) validationResult } catch { - case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => + case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL => // Re-throw ConfigException when failure policy is "fail" to halt the broker error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e) throw e @@ -418,7 +418,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging updateCurrentConfig(doLog) validationResult } catch { - case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") => + case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL => // Re-throw ConfigException when failure policy is "fail" to halt the broker error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e) throw e @@ -484,7 +484,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // Throw ConfigException if invalid configs detected with policy=fail // DynamicConfigPublisher will decide whether to halt based on whether it's the first publish - if (invalidCount > 0 && kafkaConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail")) { + if (invalidCount > 0 && kafkaConfig.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL) { val configType = if (perBrokerConfig) "per-broker" else "cluster-wide" val errorMsg = s"Invalid $configType dynamic configuration detected: ${invalidConfigNames.mkString(", ")}. " + s"Broker is configured with dynamic.config.failure.policy=fail." diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 0c4eecd4e572c..3f396439430d4 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -405,7 +405,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val maxRequestPartitionSizeLimit = getInt(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG) val deleteTopicEnable = getBoolean(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG) - val dynamicConfigFailurePolicy = getString(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG) + val dynamicConfigFailurePolicy = org.apache.kafka.server.config.DynamicConfigFailurePolicy.fromString( + getString(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG)) def compressionType = getString(ServerConfigs.COMPRESSION_TYPE_CONFIG) def gzipCompressionLevel = getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 3c3c6256859e4..40ccd5025d2c1 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -269,7 +269,7 @@ class SharedServer( */ val dynamicConfigFatalFaultHandler: FaultHandler = faultHandlerFactory.build( name = "dynamic config", - fatal = sharedServerConfig.dynamicConfigFailurePolicy.equalsIgnoreCase("fail"), + fatal = sharedServerConfig.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL, action = () => { } ) diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala index 94aa767ce0e9b..7e1798f89e421 100644 --- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala @@ -49,7 +49,7 @@ class DynamicConfigPublisher( /** * True if the dynamic config failure policy is set to "fail" */ - private val isPolicyFail: Boolean = conf.dynamicConfigFailurePolicy.equalsIgnoreCase("fail") + private val isPolicyFail: Boolean = conf.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL /** * Update metrics from validation result diff --git a/server-common/src/main/java/org/apache/kafka/server/config/DynamicConfigFailurePolicy.java b/server-common/src/main/java/org/apache/kafka/server/config/DynamicConfigFailurePolicy.java new file mode 100644 index 0000000000000..e00c54bdf4167 --- /dev/null +++ b/server-common/src/main/java/org/apache/kafka/server/config/DynamicConfigFailurePolicy.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import java.util.Locale; + +/** + * Policy to apply when dynamic configuration validation fails. + */ +public enum DynamicConfigFailurePolicy { + /** + * Log a warning and ignore invalid configurations. + * This is lenient behavior that allows the broker to continue running even with invalid dynamic configs. + */ + WARN("warn"), + + /** + * Halt the broker when invalid configurations are detected during startup (first metadata publish). + * After startup, reject configuration updates that fail validation. + * This is strict behavior that ensures configuration correctness. + */ + FAIL("fail"); + + private final String name; + + DynamicConfigFailurePolicy(String name) { + this.name = name; + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return name; + } + + /** + * Parse the policy from a string value. + * + * @param value the string value (case-insensitive) + * @return the corresponding DynamicConfigFailurePolicy + * @throws IllegalArgumentException if the value is not a valid policy + */ + public static DynamicConfigFailurePolicy fromString(String value) { + if (value == null) { + throw new IllegalArgumentException("Dynamic config failure policy cannot be null"); + } + String normalized = value.toLowerCase(Locale.ROOT); + for (DynamicConfigFailurePolicy policy : values()) { + if (policy.name.equals(normalized)) { + return policy; + } + } + throw new IllegalArgumentException("Invalid dynamic config failure policy: " + value + + ". Valid values are: warn, fail"); + } +} From 926e087acbd2f9995feb5ab087202f32a560b302 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Mon, 29 Dec 2025 04:43:39 +0800 Subject: [PATCH 12/12] refactor and rename --- config/server.properties | 2 +- .../kafka/server/DynamicBrokerConfig.scala | 8 ++--- .../main/scala/kafka/server/KafkaConfig.scala | 6 ++-- .../scala/kafka/server/SharedServer.scala | 13 ++++---- .../metadata/DynamicConfigPublisher.scala | 32 ++----------------- .../kafka/server/config/ServerConfigs.java | 13 ++++---- 6 files changed, 23 insertions(+), 51 deletions(-) diff --git a/config/server.properties b/config/server.properties index ff19c74b40d93..331c5a3351f18 100644 --- a/config/server.properties +++ b/config/server.properties @@ -23,7 +23,7 @@ node.id=1 # List of controller endpoints used connect to the controller cluster controller.quorum.bootstrap.servers=192.168.31.242:17684 -dynamic.config.failure.policy=fail +dynamic.broker.config.failure.policy=warn ############################# Socket Server Settings ############################# diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index f938b3b721711..594c1f3587660 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -44,7 +44,7 @@ import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.KafkaRaftClient import org.apache.kafka.server.{DynamicThreadPool, ProcessRole} import org.apache.kafka.server.common.ApiMessageAndVersion -import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} +import org.apache.kafka.server.config.{DynamicConfigFailurePolicy, DynamicProducerStateManagerConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs} import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider} @@ -400,7 +400,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging updateCurrentConfig(doLog) validationResult } catch { - case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL => + case e: ConfigException if kafkaConfig.dynamicBrokerConfigFailurePolicy == DynamicConfigFailurePolicy.FAIL => // Re-throw ConfigException when failure policy is "fail" to halt the broker error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keySet()}", e) throw e @@ -418,7 +418,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging updateCurrentConfig(doLog) validationResult } catch { - case e: ConfigException if kafkaConfig.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL => + case e: ConfigException if kafkaConfig.dynamicBrokerConfigFailurePolicy == DynamicConfigFailurePolicy.FAIL => // Re-throw ConfigException when failure policy is "fail" to halt the broker error(s"Cluster default configs could not be applied: ${persistentProps.keySet()}", e) throw e @@ -484,7 +484,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // Throw ConfigException if invalid configs detected with policy=fail // DynamicConfigPublisher will decide whether to halt based on whether it's the first publish - if (invalidCount > 0 && kafkaConfig.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL) { + if (invalidCount > 0 && kafkaConfig.dynamicBrokerConfigFailurePolicy == DynamicConfigFailurePolicy.FAIL) { val configType = if (perBrokerConfig) "per-broker" else "cluster-wide" val errorMsg = s"Invalid $configType dynamic configuration detected: ${invalidConfigNames.mkString(", ")}. " + s"Broker is configured with dynamic.config.failure.policy=fail." diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3f396439430d4..4a4147786b03f 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.config.AbstractKafkaConfig.getMap -import org.apache.kafka.server.config.{AbstractKafkaConfig, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{AbstractKafkaConfig, DynamicConfigFailurePolicy, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig} @@ -405,8 +405,8 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val maxRequestPartitionSizeLimit = getInt(ServerConfigs.MAX_REQUEST_PARTITION_SIZE_LIMIT_CONFIG) val deleteTopicEnable = getBoolean(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG) - val dynamicConfigFailurePolicy = org.apache.kafka.server.config.DynamicConfigFailurePolicy.fromString( - getString(ServerConfigs.DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG)) + val dynamicBrokerConfigFailurePolicy = DynamicConfigFailurePolicy.fromString( + getString(ServerConfigs.DYNAMIC_BROKER_CONFIG_FAILURE_POLICY_CONFIG)) def compressionType = getString(ServerConfigs.COMPRESSION_TYPE_CONFIG) def gzipCompressionLevel = getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG) diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala index 40ccd5025d2c1..a4cbf09e8c6c0 100644 --- a/core/src/main/scala/kafka/server/SharedServer.scala +++ b/core/src/main/scala/kafka/server/SharedServer.scala @@ -36,8 +36,9 @@ import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble import org.apache.kafka.raft.Endpoints import org.apache.kafka.server.{ProcessRole, ServerSocketFactory} import org.apache.kafka.server.common.ApiMessageAndVersion +import org.apache.kafka.server.config.DynamicConfigFailurePolicy import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, ProcessTerminatingFaultHandler} -import org.apache.kafka.server.metrics.{BrokerServerMetrics, KafkaYammerMetrics, NodeMetrics} +import org.apache.kafka.server.metrics.{BrokerServerMetrics, InvalidConfigMetrics, KafkaYammerMetrics, NodeMetrics} import java.net.InetSocketAddress import java.util.Arrays @@ -121,7 +122,7 @@ class SharedServer( @volatile var brokerMetrics: BrokerServerMetrics = _ @volatile var controllerServerMetrics: ControllerMetadataMetrics = _ @volatile var nodeMetrics: NodeMetrics = _ - @volatile var invalidConfigMetrics: org.apache.kafka.server.metrics.InvalidConfigMetrics = _ + @volatile var invalidConfigMetrics: InvalidConfigMetrics = _ @volatile var loader: MetadataLoader = _ private val snapshotsDisabledReason = new AtomicReference[String](null) @volatile var snapshotEmitter: SnapshotEmitter = _ @@ -268,8 +269,8 @@ class SharedServer( * with dynamic.config.failure.policy=fail during broker startup. */ val dynamicConfigFatalFaultHandler: FaultHandler = faultHandlerFactory.build( - name = "dynamic config", - fatal = sharedServerConfig.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL, + name = "dynamic config loading", + fatal = sharedServerConfig.dynamicBrokerConfigFailurePolicy == DynamicConfigFailurePolicy.FAIL, action = () => { } ) @@ -286,10 +287,8 @@ class SharedServer( } // Create invalid config metrics for DynamicConfigPublisher (shared across all server instances) - invalidConfigMetrics = new org.apache.kafka.server.metrics.InvalidConfigMetrics(metrics) + invalidConfigMetrics = new InvalidConfigMetrics(metrics) sharedServerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None) - brokerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None) - controllerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None) if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) { brokerMetrics = new BrokerServerMetrics(metrics) diff --git a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala index 7e1798f89e421..462b1a8d8ee43 100644 --- a/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala @@ -25,7 +25,7 @@ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, CLIENT_METRICS, GROUP, TOPIC} import org.apache.kafka.image.loader.LoaderManifest import org.apache.kafka.image.{MetadataDelta, MetadataImage} -import org.apache.kafka.server.config.ConfigType +import org.apache.kafka.server.config.{ConfigType, DynamicConfigFailurePolicy} import org.apache.kafka.server.fault.FaultHandler import org.apache.kafka.server.metrics.InvalidConfigMetrics @@ -49,7 +49,7 @@ class DynamicConfigPublisher( /** * True if the dynamic config failure policy is set to "fail" */ - private val isPolicyFail: Boolean = conf.dynamicConfigFailurePolicy == org.apache.kafka.server.config.DynamicConfigFailurePolicy.FAIL + private val isPolicyFail: Boolean = conf.dynamicBrokerConfigFailurePolicy == DynamicConfigFailurePolicy.FAIL /** * Update metrics from validation result @@ -146,21 +146,7 @@ class DynamicConfigPublisher( // set to /tmp/foo, we still want to reload /tmp/foo in case its contents // have changed. This doesn't apply to topic configs or cluster configs. reloadUpdatedFilesWithoutConfigChange(props) - - // Update metrics if there were validation issues (only for BrokerConfigHandler) - nodeConfigHandler match { - case brokerHandler: BrokerConfigHandler => - brokerHandler.lastValidationResult.foreach(updateMetrics) - case _ => // other config handlers don't have validation results - } } catch { - case e: ConfigException if _firstPublish && isPolicyFail => - // During first publish (startup), if failure policy is "fail", use fatal fault handler - throw fatalFaultHandler.handleFault( - s"Error updating node ${conf.nodeId} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail.", - e - ) case t: Throwable => faultHandler.handleFault("Error updating " + s"node with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + s"in $deltaName", t) @@ -175,13 +161,6 @@ class DynamicConfigPublisher( toLoggableProps(resource, props).mkString(",")) metricsConfigHandler.processConfigChanges(resource.name(), props) } catch { - case e: ConfigException if _firstPublish && isPolicyFail => - // During first publish (startup), if failure policy is "fail", use fatal fault handler - throw fatalFaultHandler.handleFault( - s"Error updating client metrics ${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail.", - e - ) case t: Throwable => faultHandler.handleFault("Error updating client metrics" + s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + s"in $deltaName", t) @@ -194,13 +173,6 @@ class DynamicConfigPublisher( toLoggableProps(resource, props).mkString(",")) groupConfigHandler.processConfigChanges(resource.name(), props) } catch { - case e: ConfigException if _firstPublish && isPolicyFail => - // During first publish (startup), if failure policy is "fail", use fatal fault handler - throw fatalFaultHandler.handleFault( - s"Error updating group ${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + - s"in $deltaName. Broker is configured with dynamic.config.failure.policy=fail.", - e - ) case t: Throwable => faultHandler.handleFault("Error updating group " + s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + s"in $deltaName", t) diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java index 0acfae7982ba3..695d0207c32cf 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java @@ -77,11 +77,12 @@ public class ServerConfigs { "When set to false, deletion requests will be explicitly rejected by the broker."; /** ********* Dynamic Configuration Failure Policy ***********/ - public static final String DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG = "dynamic.config.failure.policy"; - public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DEFAULT = "fail"; - public static final String DYNAMIC_CONFIG_FAILURE_POLICY_DOC = "The policy to apply when dynamic configuration validation fails. " + - "Valid values are 'warn' (log warning and ignore invalid configs, default for backward compatibility) and 'fail' (halt the broker). " + - "This setting helps ensure configuration consistency and prevents silent configuration errors."; + public static final String DYNAMIC_BROKER_CONFIG_FAILURE_POLICY_CONFIG = "dynamic.broker.config.failure.policy"; + public static final String DYNAMIC_BROKER_CONFIG_FAILURE_POLICY_DEFAULT = "fail"; + public static final String DYNAMIC_BROKER_CONFIG_FAILURE_POLICY_DOC = "The policy to apply when dynamic broker configuration validation fails. " + + "Valid values are 'warn' (log warning and ignore invalid configs) and 'fail' (halt the broker during startup if invalid configs are detected). " + + "This setting helps ensure broker configuration consistency and prevents silent configuration errors. " + + "This policy applies only to dynamic broker configurations, not to topic, group, or client metrics configurations."; public static final String COMPRESSION_TYPE_CONFIG = ServerTopicConfigSynonyms.serverSynonym(TopicConfig.COMPRESSION_TYPE_CONFIG); public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + @@ -150,7 +151,7 @@ public class ServerConfigs { /** ********* Controlled shutdown configuration ***********/ .define(CONTROLLED_SHUTDOWN_ENABLE_CONFIG, BOOLEAN, CONTROLLED_SHUTDOWN_ENABLE_DEFAULT, MEDIUM, CONTROLLED_SHUTDOWN_ENABLE_DOC) .define(DELETE_TOPIC_ENABLE_CONFIG, BOOLEAN, DELETE_TOPIC_ENABLE_DEFAULT, HIGH, DELETE_TOPIC_ENABLE_DOC) - .define(DYNAMIC_CONFIG_FAILURE_POLICY_CONFIG, STRING, DYNAMIC_CONFIG_FAILURE_POLICY_DEFAULT, ConfigDef.ValidString.in("warn", "fail"), MEDIUM, DYNAMIC_CONFIG_FAILURE_POLICY_DOC) + .define(DYNAMIC_BROKER_CONFIG_FAILURE_POLICY_CONFIG, STRING, DYNAMIC_BROKER_CONFIG_FAILURE_POLICY_DEFAULT, ConfigDef.ValidString.in("warn", "fail"), MEDIUM, DYNAMIC_BROKER_CONFIG_FAILURE_POLICY_DOC) .define(COMPRESSION_TYPE_CONFIG, STRING, ServerLogConfigs.COMPRESSION_TYPE_DEFAULT, ConfigDef.ValidString.in(BrokerCompressionType.names().toArray(new String[0])), HIGH, COMPRESSION_TYPE_DOC) .define(COMPRESSION_GZIP_LEVEL_CONFIG, INT, CompressionType.GZIP.defaultLevel(), CompressionType.GZIP.levelValidator(), MEDIUM, COMPRESSION_GZIP_LEVEL_DOC) .define(COMPRESSION_LZ4_LEVEL_CONFIG, INT, CompressionType.LZ4.defaultLevel(), CompressionType.LZ4.levelValidator(), MEDIUM, COMPRESSION_LZ4_LEVEL_DOC)