diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 224ebe48d0d5b..0df408ac51bd5 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -231,7 +231,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
reconfigurables.add(reconfigurable)
}
- def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable): Unit = {
+ def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable[_ >: KafkaConfig]): Unit = {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
brokerReconfigurables.add(new BrokerReconfigurable {
override def reconfigurableConfigs: util.Set[String] = reconfigurable.reconfigurableConfigs
diff --git a/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java b/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java
index f17590317bff7..a39e73aed5d8c 100644
--- a/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java
+++ b/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java
@@ -32,8 +32,10 @@
*
Validating the new configuration before applying it via {@link #validateReconfiguration(AbstractConfig)}
* Applying the new configuration via {@link #reconfigure(AbstractConfig, AbstractConfig)}
*
+ *
+ * @param the configuration type used by the reconfigurable component
*/
-public interface BrokerReconfigurable {
+public interface BrokerReconfigurable {
/**
* Returns the set of configuration keys that can be dynamically reconfigured.
*
@@ -53,7 +55,7 @@ public interface BrokerReconfigurable {
*
* @param newConfig the new configuration to validate
*/
- void validateReconfiguration(AbstractConfig newConfig);
+ void validateReconfiguration(T newConfig);
/**
* Applies the new configuration.
@@ -63,5 +65,5 @@ public interface BrokerReconfigurable {
* @param oldConfig the previous configuration
* @param newConfig the new configuration to apply
*/
- void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig);
+ void reconfigure(T oldConfig, T newConfig);
}
diff --git a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
index de9289e09a781..548b5c490f92c 100644
--- a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
+++ b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java
@@ -27,7 +27,7 @@
import java.util.Set;
-public class DynamicProducerStateManagerConfig implements BrokerReconfigurable {
+public class DynamicProducerStateManagerConfig implements BrokerReconfigurable {
private final Logger log = LoggerFactory.getLogger(DynamicProducerStateManagerConfig.class);
private final ProducerStateManagerConfig producerStateManagerConfig;
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
index 5b962f8fe457c..0914e486b878e 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java
@@ -95,7 +95,7 @@
* tombstone deletion.
*
*/
-public class LogCleaner implements BrokerReconfigurable {
+public class LogCleaner implements BrokerReconfigurable {
private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);
public static final Set RECONFIGURABLE_CONFIGS = Set.of(