Skip to content

Commit 5e96e5c

Browse files
authored
KAFKA-15853 Refactor KafkaConfig to use PasswordEncoderConfigs (#15770)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 98548c5 commit 5e96e5c

File tree

10 files changed

+80
-87
lines changed

10 files changed

+80
-87
lines changed

core/src/main/scala/kafka/admin/ConfigCommand.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -212,14 +212,14 @@ object ConfigCommand extends Logging {
212212
}
213213

214214
private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = {
215-
encoderConfigs.get(PasswordEncoderConfigs.SECRET)
216-
val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.SECRET,
215+
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)
216+
val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG,
217217
throw new IllegalArgumentException("Password encoder secret not specified"))
218218
PasswordEncoder.encrypting(new Password(encoderSecret),
219219
null,
220-
encoderConfigs.getOrElse(PasswordEncoderConfigs.CIPHER_ALGORITHM, PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM),
221-
encoderConfigs.get(PasswordEncoderConfigs.KEY_LENGTH).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_KEY_LENGTH),
222-
encoderConfigs.get(PasswordEncoderConfigs.ITERATIONS).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_ITERATIONS))
220+
encoderConfigs.getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT),
221+
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT),
222+
encoderConfigs.get(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG).map(_.toInt).getOrElse(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT))
223223
}
224224

225225
/**
@@ -239,8 +239,8 @@ object ConfigCommand extends Logging {
239239
DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
240240
val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
241241
if (passwordConfigs.nonEmpty) {
242-
require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.SECRET),
243-
s"${PasswordEncoderConfigs.SECRET} must be specified to update $passwordConfigs." +
242+
require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
243+
s"${PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG} must be specified to update $passwordConfigs." +
244244
" Other password encoder configs like cipher algorithm and iterations may also be specified" +
245245
" to override the default encoding parameters. Password encoder configs will not be persisted" +
246246
" in ZooKeeper."

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 12 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -203,14 +203,6 @@ object KafkaConfig {
203203
val DelegationTokenExpiryTimeMsProp = "delegation.token.expiry.time.ms"
204204
val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"
205205

206-
/** ********* Password encryption configuration for dynamic configs *********/
207-
val PasswordEncoderSecretProp = PasswordEncoderConfigs.SECRET
208-
val PasswordEncoderOldSecretProp = PasswordEncoderConfigs.OLD_SECRET
209-
val PasswordEncoderKeyFactoryAlgorithmProp = PasswordEncoderConfigs.KEYFACTORY_ALGORITHM
210-
val PasswordEncoderCipherAlgorithmProp = PasswordEncoderConfigs.CIPHER_ALGORITHM
211-
val PasswordEncoderKeyLengthProp = PasswordEncoderConfigs.KEY_LENGTH
212-
val PasswordEncoderIterationsProp = PasswordEncoderConfigs.ITERATIONS
213-
214206
/** Internal Configurations **/
215207
val UnstableApiVersionsEnableProp = "unstable.api.versions.enable"
216208
val UnstableMetadataVersionsEnableProp = "unstable.metadata.versions.enable"
@@ -419,17 +411,6 @@ object KafkaConfig {
419411
val DelegationTokenExpiryTimeMsDoc = "The token validity time in milliseconds before the token needs to be renewed. Default value 1 day."
420412
val DelegationTokenExpiryCheckIntervalDoc = "Scan interval to remove expired delegation tokens."
421413

422-
/** ********* Password encryption configuration for dynamic configs *********/
423-
val PasswordEncoderSecretDoc = "The secret used for encoding dynamically configured passwords for this broker."
424-
val PasswordEncoderOldSecretDoc = "The old secret that was used for encoding dynamically configured passwords. " +
425-
"This is required only when the secret is updated. If specified, all dynamically encoded passwords are " +
426-
s"decoded using this old secret and re-encoded using $PasswordEncoderSecretProp when broker starts up."
427-
val PasswordEncoderKeyFactoryAlgorithmDoc = "The SecretKeyFactory algorithm used for encoding dynamically configured passwords. " +
428-
"Default is PBKDF2WithHmacSHA512 if available and PBKDF2WithHmacSHA1 otherwise."
429-
val PasswordEncoderCipherAlgorithmDoc = "The Cipher algorithm used for encoding dynamically configured passwords."
430-
val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords."
431-
val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords."
432-
433414
@nowarn("cat=deprecation")
434415
val configDef = {
435416
import ConfigDef.Importance._
@@ -765,12 +746,12 @@ object KafkaConfig {
765746
.define(DelegationTokenExpiryCheckIntervalMsProp, LONG, Defaults.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS, atLeast(1), LOW, DelegationTokenExpiryCheckIntervalDoc)
766747

767748
/** ********* Password encryption configuration for dynamic configs *********/
768-
.define(PasswordEncoderSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderSecretDoc)
769-
.define(PasswordEncoderOldSecretProp, PASSWORD, null, MEDIUM, PasswordEncoderOldSecretDoc)
770-
.define(PasswordEncoderKeyFactoryAlgorithmProp, STRING, null, LOW, PasswordEncoderKeyFactoryAlgorithmDoc)
771-
.define(PasswordEncoderCipherAlgorithmProp, STRING, Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM, LOW, PasswordEncoderCipherAlgorithmDoc)
772-
.define(PasswordEncoderKeyLengthProp, INT, Defaults.PASSWORD_ENCODER_KEY_LENGTH, atLeast(8), LOW, PasswordEncoderKeyLengthDoc)
773-
.define(PasswordEncoderIterationsProp, INT, Defaults.PASSWORD_ENCODER_ITERATIONS, atLeast(1024), LOW, PasswordEncoderIterationsDoc)
749+
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_DOC)
750+
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, PASSWORD, null, MEDIUM, PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_DOC)
751+
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, STRING, null, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_DOC)
752+
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, STRING, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DEFAULT, LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_DOC)
753+
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DEFAULT, atLeast(8), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_DOC)
754+
.define(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, INT, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DEFAULT, atLeast(1024), LOW, PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_DOC)
774755

775756
/** ********* Raft Quorum Configuration *********/
776757
.define(RaftConfig.QUORUM_VOTERS_CONFIG, LIST, Defaults.QUORUM_VOTERS, new RaftConfig.ControllerQuorumVotersValidator(), HIGH, RaftConfig.QUORUM_VOTERS_DOC)
@@ -1349,12 +1330,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
13491330
val delegationTokenExpiryCheckIntervalMs = getLong(KafkaConfig.DelegationTokenExpiryCheckIntervalMsProp)
13501331

13511332
/** ********* Password encryption configuration for dynamic configs *********/
1352-
def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp))
1353-
def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp))
1354-
def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp)
1355-
def passwordEncoderKeyFactoryAlgorithm = getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp)
1356-
def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp)
1357-
def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp)
1333+
def passwordEncoderSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG))
1334+
def passwordEncoderOldSecret = Option(getPassword(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG))
1335+
def passwordEncoderCipherAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG)
1336+
def passwordEncoderKeyFactoryAlgorithm = getString(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG)
1337+
def passwordEncoderKeyLength = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG)
1338+
def passwordEncoderIterations = getInt(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG)
13581339

13591340
/** ********* Quota Configuration **************/
13601341
val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp)

core/src/test/scala/integration/kafka/admin/ConfigCommandIntegrationTest.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ package kafka.admin
1818

1919
import kafka.admin.ConfigCommand.ConfigCommandOptions
2020
import kafka.cluster.{Broker, EndPoint}
21-
import kafka.server.{KafkaConfig, QuorumTestHarness}
21+
import kafka.server.QuorumTestHarness
2222
import kafka.utils.{Exit, Logging}
2323
import kafka.zk.{AdminZkClient, BrokerInfo}
2424
import org.apache.kafka.common.config.ConfigException
2525
import org.apache.kafka.common.network.ListenerName
2626
import org.apache.kafka.common.security.auth.SecurityProtocol
27+
import org.apache.kafka.security.PasswordEncoderConfigs
2728
import org.apache.kafka.server.common.MetadataVersion
2829
import org.apache.kafka.server.config.ZooKeeperInternals
2930
import org.junit.jupiter.api.Assertions._
@@ -134,10 +135,10 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
134135

135136
// Password config update with encoder secret should succeed and encoded password must be stored in ZK
136137
val configs = Map("listener.name.external.ssl.keystore.password" -> "secret", "log.cleaner.threads" -> "2")
137-
val encoderConfigs = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret")
138+
val encoderConfigs = Map(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG -> "encoder-secret")
138139
alterConfigWithZk(configs, Some(brokerId), encoderConfigs)
139140
val brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId)
140-
assertFalse(brokerConfigs.contains(KafkaConfig.PasswordEncoderSecretProp), "Encoder secret stored in ZooKeeper")
141+
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper")
141142
assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")) // not encoded
142143
val encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password")
143144
val passwordEncoder = ConfigCommand.createPasswordEncoder(encoderConfigs)
@@ -146,11 +147,11 @@ class ConfigCommandIntegrationTest extends QuorumTestHarness with Logging {
146147

147148
// Password config update with overrides for encoder parameters
148149
val configs2 = Map("listener.name.internal.ssl.keystore.password" -> "secret2")
149-
val encoderConfigs2 = Map(KafkaConfig.PasswordEncoderSecretProp -> "encoder-secret",
150-
KafkaConfig.PasswordEncoderCipherAlgorithmProp -> "DES/CBC/PKCS5Padding",
151-
KafkaConfig.PasswordEncoderIterationsProp -> "1024",
152-
KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp -> "PBKDF2WithHmacSHA1",
153-
KafkaConfig.PasswordEncoderKeyLengthProp -> "64")
150+
val encoderConfigs2 = Map(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG -> "encoder-secret",
151+
PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG -> "DES/CBC/PKCS5Padding",
152+
PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG -> "1024",
153+
PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG -> "PBKDF2WithHmacSHA1",
154+
PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG -> "64")
154155
alterConfigWithZk(configs2, Some(brokerId), encoderConfigs2)
155156
val brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId)
156157
val encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password")

core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
6060
import org.apache.kafka.common.security.scram.ScramCredential
6161
import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
6262
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
63-
import org.apache.kafka.security.PasswordEncoder
63+
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
6464
import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ZkConfigs}
6565
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
6666
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -137,7 +137,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
137137
props.put(KafkaSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, kafkaServerSaslMechanisms.mkString(","))
138138
props.put(ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "2000") // low value to test log rolling on config update
139139
props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2") // greater than one to test reducing threads
140-
props.put(KafkaConfig.PasswordEncoderSecretProp, "dynamic-config-secret")
140+
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "dynamic-config-secret")
141141
props.put(ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, 1680000000.toString)
142142
props.put(ServerLogConfigs.LOG_RETENTION_TIME_HOURS_CONFIG, 168.toString)
143143

@@ -1117,7 +1117,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
11171117
val propsEncodedWithOldSecret = props.clone().asInstanceOf[Properties]
11181118
val config = server.config
11191119
val oldSecret = "old-dynamic-config-secret"
1120-
config.dynamicConfig.staticBrokerConfigs.put(KafkaConfig.PasswordEncoderOldSecretProp, oldSecret)
1120+
config.dynamicConfig.staticBrokerConfigs.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, oldSecret)
11211121
val passwordConfigs = props.asScala.filter { case (k, _) => DynamicBrokerConfig.isPasswordConfig(k) }
11221122
assertTrue(passwordConfigs.nonEmpty, "Password configs not found")
11231123
val passwordDecoder = createPasswordEncoder(config, config.passwordEncoderSecret)
@@ -1595,7 +1595,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
15951595
val externalListenerPrefix = listenerPrefix(SecureExternal)
15961596
val sslStoreProps = new Properties
15971597
sslStoreProps ++= securityProps(sslProperties, KEYSTORE_PROPS, externalListenerPrefix)
1598-
sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
1598+
sslStoreProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
15991599
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
16001600

16011601
val entityType = ConfigType.BROKER
@@ -1610,7 +1610,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
16101610
sslStoreProps.setProperty(configName, encodedValue)
16111611
}
16121612
}
1613-
sslStoreProps.remove(KafkaConfig.PasswordEncoderSecretProp)
1613+
sslStoreProps.remove(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG)
16141614
adminZkClient.changeConfigs(entityType, entityName, sslStoreProps)
16151615

16161616
val brokerProps = adminZkClient.fetchEntityConfig("brokers", kafkaConfig.brokerId.toString)

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.kafka.common.config.types.Password
3333
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
3434
import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
3535
import org.apache.kafka.common.network.ListenerName
36+
import org.apache.kafka.security.PasswordEncoderConfigs
3637
import org.apache.kafka.server.authorizer._
3738
import org.apache.kafka.server.config.{Defaults, KafkaSecurityConfigs, ZkConfigs}
3839
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -330,7 +331,7 @@ class DynamicBrokerConfigTest {
330331

331332
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean): Unit = {
332333
val configProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
333-
configProps.put(KafkaConfig.PasswordEncoderSecretProp, "broker.secret")
334+
configProps.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "broker.secret")
334335
val config = KafkaConfig(configProps)
335336
config.dynamicConfig.initialize(None, None)
336337

@@ -381,7 +382,7 @@ class DynamicBrokerConfigTest {
381382
def testPasswordConfigEncryption(): Unit = {
382383
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
383384
val configWithoutSecret = KafkaConfig(props)
384-
props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
385+
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
385386
val configWithSecret = KafkaConfig(props)
386387
val dynamicProps = new Properties
387388
dynamicProps.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "myLoginModule required;")
@@ -402,7 +403,7 @@ class DynamicBrokerConfigTest {
402403
def testPasswordConfigEncoderSecretChange(): Unit = {
403404
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
404405
props.put(KafkaSecurityConfigs.SASL_JAAS_CONFIG, "staticLoginModule required;")
405-
props.put(KafkaConfig.PasswordEncoderSecretProp, "config-encoder-secret")
406+
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "config-encoder-secret")
406407
val config = KafkaConfig(props)
407408
config.dynamicConfig.initialize(None, None)
408409
val dynamicProps = new Properties
@@ -421,14 +422,14 @@ class DynamicBrokerConfigTest {
421422
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
422423

423424
// New config with new secret should use the dynamic password config if new and old secrets are configured in KafkaConfig
424-
props.put(KafkaConfig.PasswordEncoderSecretProp, "new-encoder-secret")
425-
props.put(KafkaConfig.PasswordEncoderOldSecretProp, "config-encoder-secret")
425+
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "new-encoder-secret")
426+
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_OLD_SECRET_CONFIG, "config-encoder-secret")
426427
val newConfigWithNewAndOldSecret = KafkaConfig(props)
427428
newConfigWithNewAndOldSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
428429
assertEquals("dynamicLoginModule required;", newConfigWithSameSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)
429430

430431
// New config with new secret alone should revert to static password config since dynamic config cannot be decoded
431-
props.put(KafkaConfig.PasswordEncoderSecretProp, "another-new-encoder-secret")
432+
props.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "another-new-encoder-secret")
432433
val newConfigWithNewSecret = KafkaConfig(props)
433434
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
434435
assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaSecurityConfigs.SASL_JAAS_CONFIG).asInstanceOf[Password].value)

0 commit comments

Comments
 (0)