Skip to content

Commit 615f1a0

Browse files
authored
KAFKA-16181: Use incrementalAlterConfigs when updating broker configs by kafka-configs.sh (#15304)
This PR implement KIP-1011, kafka-configs.sh now uses incrementalAlterConfigs API to alter broker configurations instead of the deprecated alterConfigs API, and it will fall directly if the broker doesn't support incrementalAlterConfigs. Reviewers: David Jacot <[email protected]>, OmniaGM <[email protected]>.
1 parent 7ca02fd commit 615f1a0

File tree

5 files changed

+183
-68
lines changed

5 files changed

+183
-68
lines changed

Diff for: core/src/main/scala/kafka/admin/ConfigCommand.scala

+38-39
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,24 @@
1818
package kafka.admin
1919

2020
import java.nio.charset.StandardCharsets
21-
import java.util.concurrent.TimeUnit
21+
import java.util.concurrent.{ExecutionException, TimeUnit}
2222
import java.util.{Collections, Properties}
2323
import joptsimple._
2424
import kafka.server.DynamicConfig
2525
import kafka.utils.Implicits._
2626
import kafka.utils.Logging
27-
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
27+
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
2828
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
29-
import org.apache.kafka.common.errors.InvalidConfigurationException
29+
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
3030
import org.apache.kafka.common.internals.Topic
31+
import org.apache.kafka.common.protocol.ApiKeys
3132
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
3233
import org.apache.kafka.common.security.scram.internals.ScramMechanism
3334
import org.apache.kafka.common.utils.{Exit, Utils}
3435
import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
3536
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
3637
import org.apache.kafka.storage.internals.log.LogConfig
38+
3739
import scala.annotation.nowarn
3840
import scala.jdk.CollectionConverters._
3941
import scala.collection._
@@ -80,6 +82,11 @@ object ConfigCommand extends Logging {
8082
System.err.println(e.getMessage)
8183
Exit.exit(1)
8284

85+
case e: UnsupportedVersionException =>
86+
logger.debug(s"Unsupported API encountered in server when executing config command with args '${args.mkString(" ")}'")
87+
System.err.println(e.getMessage)
88+
Exit.exit(1)
89+
8390
case t: Throwable =>
8491
logger.debug(s"Error while executing config command with args '${args.mkString(" ")}'", t)
8592
System.err.println(s"Error while executing config command with args '${args.mkString(" ")}'")
@@ -161,7 +168,6 @@ object ConfigCommand extends Logging {
161168
}
162169
}
163170

164-
@nowarn("cat=deprecation")
165171
def alterConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
166172
val entityTypes = opts.entityTypes
167173
val entityNames = opts.entityNames
@@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
172178
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
173179

174180
entityTypeHead match {
175-
case ConfigType.TOPIC =>
176-
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
177-
178-
case ConfigType.BROKER =>
179-
val oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = false, describeAll = false)
180-
.map { entry => (entry.name, entry) }.toMap
181-
182-
// fail the command if any of the configs to be deleted does not exist
183-
val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
184-
if (invalidConfigs.nonEmpty)
185-
throw new InvalidConfigurationException(s"Invalid config(s): ${invalidConfigs.mkString(",")}")
186-
187-
val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
188-
val sensitiveEntries = newEntries.filter(_._2.value == null)
189-
if (sensitiveEntries.nonEmpty)
190-
throw new InvalidConfigurationException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}")
191-
val newConfig = new JConfig(newEntries.asJava.values)
192-
193-
val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityNameHead)
194-
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
195-
adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
181+
case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | ConfigType.GROUP =>
182+
val configResourceType = entityTypeHead match {
183+
case ConfigType.TOPIC => ConfigResource.Type.TOPIC
184+
case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
185+
case ConfigType.BROKER => ConfigResource.Type.BROKER
186+
case ConfigType.GROUP => ConfigResource.Type.GROUP
187+
}
188+
try {
189+
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, configResourceType)
190+
} catch {
191+
case e: ExecutionException =>
192+
e.getCause match {
193+
case _: UnsupportedVersionException =>
194+
throw new UnsupportedVersionException(s"The ${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The API is supported starting from version 2.3.0."
195+
+ " You may want to use an older version of this tool to interact with your cluster, or upgrade your brokers to version 2.3.0 or newer to avoid this error.")
196+
case _ => throw e
197+
}
198+
case e: Throwable => throw e
199+
}
196200

197201
case BrokerLoggerConfigType =>
198202
val validLoggers = getResourceConfig(adminClient, entityTypeHead, entityNameHead, includeSynonyms = true, describeAll = false).map(_.name)
@@ -203,10 +207,10 @@ object ConfigCommand extends Logging {
203207

204208
val configResource = new ConfigResource(ConfigResource.Type.BROKER_LOGGER, entityNameHead)
205209
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
206-
val alterLogLevelEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
207-
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
208-
).asJavaCollection
209-
adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
210+
val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, AlterConfigOp.OpType.SET))
211+
val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
212+
val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
213+
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
210214

211215
case ConfigType.USER | ConfigType.CLIENT =>
212216
val hasQuotaConfigsToAdd = configsToBeAdded.keys.exists(QuotaConfig.isClientOrUserQuotaConfig)
@@ -250,13 +254,8 @@ object ConfigCommand extends Logging {
250254
throw new IllegalArgumentException(s"Only connection quota configs can be added for '${ConfigType.IP}' using --bootstrap-server. Unexpected config names: ${unknownConfigs.mkString(",")}")
251255
alterQuotaConfigs(adminClient, entityTypes, entityNames, configsToBeAddedMap, configsToBeDeleted)
252256

253-
case ConfigType.CLIENT_METRICS =>
254-
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, ConfigResource.Type.CLIENT_METRICS)
255-
256-
case ConfigType.GROUP =>
257-
alterResourceConfig(adminClient, entityTypeHead, entityNameHead, configsToBeDeleted, configsToBeAdded, ConfigResource.Type.GROUP)
258-
259-
case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
257+
case _ =>
258+
throw new IllegalArgumentException(s"Unsupported entity type: $entityTypeHead")
260259
}
261260

262261
if (entityNameHead.nonEmpty)
@@ -380,9 +379,9 @@ object ConfigCommand extends Logging {
380379

381380
val configResource = new ConfigResource(resourceType, entityNameHead)
382381
val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
383-
val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, AlterConfigOp.OpType.SET))
384-
++ configsToBeDeleted.map { k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
385-
).asJavaCollection
382+
val addEntries = configsToBeAdded.values.map(k => new AlterConfigOp(k, AlterConfigOp.OpType.SET))
383+
val deleteEntries = configsToBeDeleted.map(k => new AlterConfigOp(new ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE))
384+
val alterEntries = (deleteEntries ++ addEntries).asJavaCollection
386385
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
387386
}
388387

0 commit comments

Comments
 (0)