Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
reconfigurables.add(reconfigurable)
}

def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable): Unit = {
def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable[_ >: KafkaConfig]): Unit = {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
brokerReconfigurables.add(new BrokerReconfigurable {
override def reconfigurableConfigs: util.Set[String] = reconfigurable.reconfigurableConfigs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
* <li>Validating the new configuration before applying it via {@link #validateReconfiguration(AbstractConfig)}</li>
* <li>Applying the new configuration via {@link #reconfigure(AbstractConfig, AbstractConfig)}</li>
* </ol>
*
* @param <T> the configuration type used by the reconfigurable component
*/
public interface BrokerReconfigurable {
public interface BrokerReconfigurable<T extends AbstractConfig> {
/**
* Returns the set of configuration keys that can be dynamically reconfigured.
*
Expand All @@ -53,7 +55,7 @@ public interface BrokerReconfigurable {
*
* @param newConfig the new configuration to validate
*/
void validateReconfiguration(AbstractConfig newConfig);
void validateReconfiguration(T newConfig);

/**
* Applies the new configuration.
Expand All @@ -63,5 +65,5 @@ public interface BrokerReconfigurable {
* @param oldConfig the previous configuration
* @param newConfig the new configuration to apply
*/
void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig);
void reconfigure(T oldConfig, T newConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import java.util.Set;

public class DynamicProducerStateManagerConfig implements BrokerReconfigurable {
public class DynamicProducerStateManagerConfig implements BrokerReconfigurable<AbstractConfig> {
private final Logger log = LoggerFactory.getLogger(DynamicProducerStateManagerConfig.class);
private final ProducerStateManagerConfig producerStateManagerConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
* tombstone deletion.</li>
* </ol>
*/
public class LogCleaner implements BrokerReconfigurable {
public class LogCleaner implements BrokerReconfigurable<AbstractConfig> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From another perspective, we could move BrokerReconfigurable back to the server module, and then create server-only dynamic adapters for those classes located in non-server modules. The advantage is that BrokerReconfigurable could use AbstractKafkaConfig directly, which is simple and straightforward. The downside is that we need to create those adapters

private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class);

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
Expand Down
Loading