1616 */
1717package org .apache .kafka .server .config ;
1818
19- import org .apache .kafka .common .config .AbstractConfig ;
2019import org .apache .kafka .common .config .ConfigException ;
2120import org .apache .kafka .config .BrokerReconfigurable ;
2221import org .apache .kafka .server .common .DirectoryEventHandler ;
3332import java .util .stream .Collectors ;
3433import java .util .stream .Stream ;
3534
36- public class DynamicLogConfig implements BrokerReconfigurable {
35+ public class DynamicLogConfig implements BrokerReconfigurable < AbstractKafkaConfig > {
3736 /**
3837 * The broker configurations pertaining to logs that are reconfigurable. This set contains
3938 * the names you would use when setting a static or dynamic broker configuration (not topic
@@ -59,21 +58,12 @@ public Set<String> reconfigurableConfigs() {
5958 }
6059
6160 @ Override
62- public void validateReconfiguration (AbstractConfig newConfig ) {
63- AbstractKafkaConfig kafkaConfig = requireKafkaConfig (newConfig );
64- validateLogLocalRetentionMs (kafkaConfig );
65- validateLogLocalRetentionBytes (kafkaConfig );
66- validateLogRemoteCopyLagMs (kafkaConfig );
67- validateLogRemoteCopyLagBytes (kafkaConfig );
68- validateCordonedLogDirs (kafkaConfig );
69- }
70-
71- private AbstractKafkaConfig requireKafkaConfig (AbstractConfig config ) {
72- if (!(config instanceof AbstractKafkaConfig kafkaConfig )) {
73- throw new IllegalArgumentException ("DynamicLogConfig requires AbstractKafkaConfig, got " +
74- config .getClass ().getName ());
75- }
76- return kafkaConfig ;
61+ public void validateReconfiguration (AbstractKafkaConfig newConfig ) {
62+ validateLogLocalRetentionMs (newConfig );
63+ validateLogLocalRetentionBytes (newConfig );
64+ validateLogRemoteCopyLagMs (newConfig );
65+ validateLogRemoteCopyLagBytes (newConfig );
66+ validateCordonedLogDirs (newConfig );
7767 }
7868
7969 private void validateLogLocalRetentionMs (AbstractKafkaConfig config ) {
@@ -157,14 +147,13 @@ private void updateLogsConfig(Map<String, Object> newBrokerDefaults) {
157147 }
158148
159149 @ Override
160- public void reconfigure (AbstractConfig oldConfig , AbstractConfig newConfig ) {
161- AbstractKafkaConfig kafkaConfig = requireKafkaConfig (newConfig );
162- Map <String , Object > newBrokerDefaults = new HashMap <>(kafkaConfig .extractLogConfigMap ());
150+ public void reconfigure (AbstractKafkaConfig oldConfig , AbstractKafkaConfig newConfig ) {
151+ Map <String , Object > newBrokerDefaults = new HashMap <>(newConfig .extractLogConfigMap ());
163152 logManager .reconfigureDefaultLogConfig (new LogConfig (newBrokerDefaults ));
164153 updateLogsConfig (newBrokerDefaults );
165154
166- logManager .updateCordonedLogDirs (Set .copyOf (kafkaConfig .cordonedLogDirs ()));
167- directoryEventHandler .handleCordoned (kafkaConfig .cordonedLogDirs ().stream ()
155+ logManager .updateCordonedLogDirs (Set .copyOf (newConfig .cordonedLogDirs ()));
156+ directoryEventHandler .handleCordoned (newConfig .cordonedLogDirs ().stream ()
168157 .flatMap (dir -> logManager .directoryId (dir ).stream ())
169158 .collect (Collectors .toSet ()));
170159 }
0 commit comments