diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 224ebe48d0d5b..c958ec62d6f17 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -25,7 +25,7 @@ import kafka.network.DataPlaneAcceptor import kafka.raft.KafkaRaftManager import kafka.server.DynamicBrokerConfig._ import kafka.utils.Logging -import org.apache.kafka.common.{Endpoint, Reconfigurable, Uuid} +import org.apache.kafka.common.{Endpoint, Reconfigurable} import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource, SslConfigs} import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType} import org.apache.kafka.common.metrics.{Metrics, MetricsReporter} @@ -39,16 +39,14 @@ import org.apache.kafka.config import org.apache.kafka.network.SocketServer import org.apache.kafka.raft.KafkaRaftClient import org.apache.kafka.server.{DynamicThreadPool, ProcessRole} -import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler} -import org.apache.kafka.server.config.{DynamicConfig, DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig => JDynamicBrokerConfig} +import org.apache.kafka.server.common.ApiMessageAndVersion +import org.apache.kafka.server.config.{DynamicConfig, DynamicLogConfig, DynamicProducerStateManagerConfig, ServerConfigs, DynamicBrokerConfig => JDynamicBrokerConfig} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs} import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider} import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock} import org.apache.kafka.snapshot.RecordsSnapshotReader -import org.apache.kafka.storage.internals.log.{LogConfig, LogManager} -import java.util.stream.Collectors import scala.util.Using import scala.collection._ import scala.jdk.CollectionConverters._ @@ -231,7 +229,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging reconfigurables.add(reconfigurable) } - def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable): Unit = { + def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable[_ >: KafkaConfig]): Unit = { verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs) brokerReconfigurables.add(new BrokerReconfigurable { override def reconfigurableConfigs: util.Set[String] = reconfigurable.reconfigurableConfigs @@ -538,113 +536,6 @@ trait BrokerReconfigurable { def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit } -class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryEventHandler) extends BrokerReconfigurable with Logging { - - override def reconfigurableConfigs: util.Set[String] = { - JDynamicBrokerConfig.DynamicLogConfig.RECONFIGURABLE_CONFIGS - } - - override def validateReconfiguration(newConfig: KafkaConfig): Unit = { - // For update of topic config overrides, only config names and types are validated - // Names and types have already been validated. For consistency with topic config - // validation, no additional validation is performed. - - def validateLogLocalRetentionMs(): Unit = { - val logRetentionMs = newConfig.logRetentionTimeMillis - val logLocalRetentionMs: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionMs - if (logRetentionMs != -1L && logLocalRetentionMs != -2L) { - if (logLocalRetentionMs == -1L) { - throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs, - s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} value is set as $logRetentionMs.") - } - if (logLocalRetentionMs > logRetentionMs) { - throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs, - s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} property value: $logRetentionMs") - } - } - } - - def validateLogLocalRetentionBytes(): Unit = { - val logRetentionBytes = newConfig.logRetentionBytes - val logLocalRetentionBytes: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionBytes - if (logRetentionBytes > -1 && logLocalRetentionBytes != -2) { - if (logLocalRetentionBytes == -1) { - throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes, - s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} value is set as $logRetentionBytes.") - } - if (logLocalRetentionBytes > logRetentionBytes) { - throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes, - s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} property value: $logRetentionBytes") - } - } - } - - def validateLogRemoteCopyLagMs(): Unit = { - val logRetentionMs: Long = newConfig.logRetentionTimeMillis - val logLocalRetentionMs = newConfig.remoteLogManagerConfig.logLocalRetentionMs - val effectiveLocalRetentionMs = if (logLocalRetentionMs == -2L) logRetentionMs else logLocalRetentionMs - val logRemoteCopyLagMs = newConfig.remoteLogManagerConfig.logRemoteCopyLagMs - if (logRemoteCopyLagMs > 0L && effectiveLocalRetentionMs >= 0L && logRemoteCopyLagMs > effectiveLocalRetentionMs) { - throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs, - s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP} (effective value: $effectiveLocalRetentionMs)") - } - } - - def validateLogRemoteCopyLagBytes(): Unit = { - val logRetentionBytes: Long = newConfig.logRetentionBytes - val logLocalRetentionBytes = newConfig.remoteLogManagerConfig.logLocalRetentionBytes - val effectiveLocalRetentionBytes = if (logLocalRetentionBytes == -2L) logRetentionBytes else logLocalRetentionBytes - val logRemoteCopyLagBytes = newConfig.remoteLogManagerConfig.logRemoteCopyLagBytes - if (logRemoteCopyLagBytes > 0L && effectiveLocalRetentionBytes >= 0L && logRemoteCopyLagBytes > effectiveLocalRetentionBytes) { - throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes, - s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP} (effective value: $effectiveLocalRetentionBytes)") - } - } - - def validateCordonedLogDirs(): Unit = { - val logDirs = newConfig.logDirs() - val cordonedLogDirs = newConfig.cordonedLogDirs() - cordonedLogDirs.asScala.foreach(dir => - if (!logDirs.contains(dir)) { - throw new ConfigException(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, cordonedLogDirs, s"Invalid entry in ${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG}: $dir. " + - s"All cordoned log dirs must be entries of ${ServerLogConfigs.LOG_DIRS_CONFIG} or ${ServerLogConfigs.LOG_DIR_CONFIG}.") - } - ) - } - - validateLogLocalRetentionMs() - validateLogLocalRetentionBytes() - validateLogRemoteCopyLagMs() - validateLogRemoteCopyLagBytes() - validateCordonedLogDirs() - } - - private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit = { - logManager.brokerConfigUpdated() - logManager.allLogs.forEach { log => - val props = mutable.Map.empty[Any, Any] - props ++= newBrokerDefaults - props ++= log.config.originals.asScala.filter { case (k, _) => - log.config.overriddenConfigs.contains(k) - } - - val logConfig = new LogConfig(props.asJava, log.config.overriddenConfigs) - log.updateConfig(logConfig) - } - } - - override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { - val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap) - logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults)) - updateLogsConfig(newBrokerDefaults.asScala) - - logManager.updateCordonedLogDirs(util.Set.copyOf(newConfig.cordonedLogDirs)) - directoryEventHandler.handleCordoned(newConfig.cordonedLogDirs.stream - .flatMap[Uuid](dir => logManager.directoryId(dir).stream) - .collect(Collectors.toSet[Uuid])) - } -} - class ControllerDynamicThreadPool(controller: ControllerServer) extends BrokerReconfigurable { override def reconfigurableConfigs: util.Set[String] = { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 94b86d83d1f81..e585da592b97e 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -23,7 +23,7 @@ import java.util.Properties import kafka.utils.Logging import kafka.utils.Implicits._ import org.apache.kafka.common.{Endpoint, Reconfigurable} -import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig} +import org.apache.kafka.common.config.{ConfigDef, ConfigException} import org.apache.kafka.common.config.ConfigDef.ConfigKey import org.apache.kafka.common.config.internals.BrokerSecurityConfigs import org.apache.kafka.common.internals.Plugin @@ -594,42 +594,4 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde") } - /** - * Copy the subset of properties that are relevant to Logs. The individual properties - * are listed here since the names are slightly different in each Config class... - */ - def extractLogConfigMap: java.util.Map[String, Object] = { - val logProps = new java.util.HashMap[String, Object]() - logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes) - logProps.put(TopicConfig.SEGMENT_MS_CONFIG, logRollTimeMillis) - logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, logRollTimeJitterMillis) - logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, logIndexSizeMaxBytes) - logProps.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, logFlushIntervalMessages) - logProps.put(TopicConfig.FLUSH_MS_CONFIG, logFlushIntervalMs) - logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, logRetentionBytes) - logProps.put(TopicConfig.RETENTION_MS_CONFIG, logRetentionTimeMillis: java.lang.Long) - logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, messageMaxBytes) - logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, logIndexIntervalBytes) - logProps.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, logCleanerDeleteRetentionMs) - logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, logCleanerMinCompactionLagMs) - logProps.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, logCleanerMaxCompactionLagMs) - logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, logDeleteDelayMs) - logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, logCleanerMinCleanRatio) - logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, logCleanupPolicy) - logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, minInSyncReplicas) - logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, compressionType) - logProps.put(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, gzipCompressionLevel) - logProps.put(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, lz4CompressionLevel) - logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, zstdCompressionLevel) - logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, uncleanLeaderElectionEnable) - logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable) - logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType.name) - logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long) - logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long) - logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long) - logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long) - logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, remoteLogManagerConfig.logRemoteCopyLagMs: java.lang.Long) - logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, remoteLogManagerConfig.logRemoteCopyLagBytes: java.lang.Long) - logProps - } } diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 71ab195ed9757..e9f7a83afc247 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -37,7 +37,7 @@ import org.apache.kafka.network.{SocketServerConfigs, SocketServer => JSocketSer import org.apache.kafka.server.DynamicThreadPool import org.apache.kafka.server.authorizer._ import org.apache.kafka.server.common.DirectoryEventHandler -import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{DynamicLogConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig} import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics, MetricConfigs} import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryContext, ClientTelemetryExporter, ClientTelemetryExporterProvider, ClientTelemetryPayload, ClientTelemetryReceiver} diff --git a/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java b/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java index f17590317bff7..a39e73aed5d8c 100644 --- a/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java +++ b/server-common/src/main/java/org/apache/kafka/config/BrokerReconfigurable.java @@ -32,8 +32,10 @@ *
  • Validating the new configuration before applying it via {@link #validateReconfiguration(AbstractConfig)}
  • *
  • Applying the new configuration via {@link #reconfigure(AbstractConfig, AbstractConfig)}
  • * + * + * @param the configuration type used by the reconfigurable component */ -public interface BrokerReconfigurable { +public interface BrokerReconfigurable { /** * Returns the set of configuration keys that can be dynamically reconfigured. * @@ -53,7 +55,7 @@ public interface BrokerReconfigurable { * * @param newConfig the new configuration to validate */ - void validateReconfiguration(AbstractConfig newConfig); + void validateReconfiguration(T newConfig); /** * Applies the new configuration. @@ -63,5 +65,5 @@ public interface BrokerReconfigurable { * @param oldConfig the previous configuration * @param newConfig the new configuration to apply */ - void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig); + void reconfigure(T oldConfig, T newConfig); } diff --git a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java index 27a9750756355..dcb416a5b7747 100644 --- a/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.network.ListenerName; @@ -640,6 +641,45 @@ public Long logRetentionTimeMillis() { return millis < 0 ? Long.valueOf(-1) : millis; } + /** + * Copy the subset of properties that are relevant to logs. The individual properties + * are listed here since the names are slightly different in each config class. + */ + public Map extractLogConfigMap() { + Map logProps = new HashMap<>(); + logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes()); + logProps.put(TopicConfig.SEGMENT_MS_CONFIG, logRollTimeMillis()); + logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, logRollTimeJitterMillis()); + logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, logIndexSizeMaxBytes()); + logProps.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, logFlushIntervalMessages()); + logProps.put(TopicConfig.FLUSH_MS_CONFIG, logFlushIntervalMs()); + logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, logRetentionBytes()); + logProps.put(TopicConfig.RETENTION_MS_CONFIG, logRetentionTimeMillis()); + logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG)); + logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, logIndexIntervalBytes()); + logProps.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, logCleanerDeleteRetentionMs()); + logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, logCleanerMinCompactionLagMs()); + logProps.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, logCleanerMaxCompactionLagMs()); + logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, logDeleteDelayMs()); + logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, logCleanerMinCleanRatio()); + logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, logCleanupPolicy()); + logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, minInSyncReplicas()); + logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, getString(ServerConfigs.COMPRESSION_TYPE_CONFIG)); + logProps.put(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG)); + logProps.put(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG)); + logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG)); + logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)); + logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable()); + logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType().name); + logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs()); + logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs()); + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP)); + logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP)); + logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP)); + logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP)); + return logProps; + } + /** * Returns a map of group config names to their broker-level synonym values, used as * defaults when building a {@link GroupConfig} for {@code DescribeConfigs}. diff --git a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java index 9c0d8f2417595..1eefa1d57f513 100644 --- a/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java @@ -200,19 +200,6 @@ public static Map dynamicConfigUpdateModes() { ); } - public static class DynamicLogConfig { - /** - * The broker configurations pertaining to logs that are reconfigurable. This set contains - * the names you would use when setting a static or dynamic broker configuration (not topic - * configuration). - */ - public static final Set RECONFIGURABLE_CONFIGS = Stream.of( - ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values(), - Set.of(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG)) - .flatMap(Collection::stream) - .collect(Collectors.toUnmodifiableSet()); - } - public static class DynamicListenerConfig { /** * The set of configurations which the DynamicListenerConfig object listens for. Many of diff --git a/server/src/main/java/org/apache/kafka/server/config/DynamicLogConfig.java b/server/src/main/java/org/apache/kafka/server/config/DynamicLogConfig.java new file mode 100644 index 0000000000000..392454d87318c --- /dev/null +++ b/server/src/main/java/org/apache/kafka/server/config/DynamicLogConfig.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.config.BrokerReconfigurable; +import org.apache.kafka.server.common.DirectoryEventHandler; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.storage.internals.log.LogConfig; +import org.apache.kafka.storage.internals.log.LogManager; +import org.apache.kafka.storage.internals.log.UnifiedLog; + +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class DynamicLogConfig implements BrokerReconfigurable { + /** + * The broker configurations pertaining to logs that are reconfigurable. This set contains + * the names you would use when setting a static or dynamic broker configuration (not topic + * configuration). + */ + public static final Set RECONFIGURABLE_CONFIGS = Stream.of( + ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values(), + Set.of(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG)) + .flatMap(Collection::stream) + .collect(Collectors.toUnmodifiableSet()); + + private final LogManager logManager; + private final DirectoryEventHandler directoryEventHandler; + + public DynamicLogConfig(LogManager logManager, DirectoryEventHandler directoryEventHandler) { + this.logManager = logManager; + this.directoryEventHandler = directoryEventHandler; + } + + @Override + public Set reconfigurableConfigs() { + return RECONFIGURABLE_CONFIGS; + } + + @Override + public void validateReconfiguration(AbstractKafkaConfig newConfig) { + validateLogLocalRetentionMs(newConfig); + validateLogLocalRetentionBytes(newConfig); + validateLogRemoteCopyLagMs(newConfig); + validateLogRemoteCopyLagBytes(newConfig); + validateCordonedLogDirs(newConfig); + } + + private void validateLogLocalRetentionMs(AbstractKafkaConfig config) { + long logRetentionMs = config.logRetentionTimeMillis(); + long logLocalRetentionMs = config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP); + if (logRetentionMs != LogConfig.NO_RETENTION_LIMIT && logLocalRetentionMs != LogConfig.DEFAULT_LOCAL_RETENTION_MS) { + if (logLocalRetentionMs == LogConfig.NO_RETENTION_LIMIT) { + throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs, + "Value must not be " + LogConfig.NO_RETENTION_LIMIT + " as " + ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG + " value is set as " + logRetentionMs + "."); + } + if (logLocalRetentionMs > logRetentionMs) { + throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs, + "Value must not be more than " + ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG + " property value: " + logRetentionMs); + } + } + } + + private void validateLogLocalRetentionBytes(AbstractKafkaConfig config) { + long logRetentionBytes = config.logRetentionBytes(); + long logLocalRetentionBytes = config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP); + if (logRetentionBytes > LogConfig.NO_RETENTION_LIMIT && logLocalRetentionBytes != LogConfig.DEFAULT_LOCAL_RETENTION_BYTES) { + if (logLocalRetentionBytes == LogConfig.NO_RETENTION_LIMIT) { + throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes, + "Value must not be " + LogConfig.NO_RETENTION_LIMIT + " as " + ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG + " value is set as " + logRetentionBytes + "."); + } + if (logLocalRetentionBytes > logRetentionBytes) { + throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes, + "Value must not be more than " + ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG + " property value: " + logRetentionBytes); + } + } + } + + private void validateLogRemoteCopyLagMs(AbstractKafkaConfig config) { + long logRetentionMs = config.logRetentionTimeMillis(); + long logLocalRetentionMs = config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP); + long effectiveLocalRetentionMs = logLocalRetentionMs == LogConfig.DEFAULT_LOCAL_RETENTION_MS ? logRetentionMs : logLocalRetentionMs; + long logRemoteCopyLagMs = config.getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP); + if (logRemoteCopyLagMs > 0L && effectiveLocalRetentionMs >= 0L && logRemoteCopyLagMs > effectiveLocalRetentionMs) { + throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs, + "Value must not exceed " + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP + + " (effective value: " + effectiveLocalRetentionMs + ")"); + } + } + + private void validateLogRemoteCopyLagBytes(AbstractKafkaConfig config) { + long logRetentionBytes = config.logRetentionBytes(); + long logLocalRetentionBytes = config.getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP); + long effectiveLocalRetentionBytes = logLocalRetentionBytes == LogConfig.DEFAULT_LOCAL_RETENTION_BYTES ? logRetentionBytes : logLocalRetentionBytes; + long logRemoteCopyLagBytes = config.getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP); + if (logRemoteCopyLagBytes > 0L && effectiveLocalRetentionBytes >= 0L && logRemoteCopyLagBytes > effectiveLocalRetentionBytes) { + throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes, + "Value must not exceed " + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP + + " (effective value: " + effectiveLocalRetentionBytes + ")"); + } + } + + private void validateCordonedLogDirs(AbstractKafkaConfig config) { + List cordonedLogDirs = config.cordonedLogDirs(); + List logDirs = config.logDirs(); + for (String dir : cordonedLogDirs) { + if (!logDirs.contains(dir)) { + throw new ConfigException(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, cordonedLogDirs, + "Invalid entry in " + ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG + ": " + dir + ". " + + "All cordoned log dirs must be entries of " + ServerLogConfigs.LOG_DIRS_CONFIG + " or " + + ServerLogConfigs.LOG_DIR_CONFIG + "."); + } + } + } + + private void updateLogsConfig(Map newBrokerDefaults) { + logManager.brokerConfigUpdated(); + for (UnifiedLog unifiedLog : logManager.allLogs()) { + Map props = new HashMap<>(newBrokerDefaults); + unifiedLog.config().originals().forEach((key, value) -> { + if (unifiedLog.config().overriddenConfigs.contains(key)) { + props.put(key, value); + } + }); + unifiedLog.updateConfig(new LogConfig(props, unifiedLog.config().overriddenConfigs)); + } + } + + @Override + public void reconfigure(AbstractKafkaConfig oldConfig, AbstractKafkaConfig newConfig) { + Map newBrokerDefaults = new HashMap<>(newConfig.extractLogConfigMap()); + logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults)); + updateLogsConfig(newBrokerDefaults); + + logManager.updateCordonedLogDirs(Set.copyOf(newConfig.cordonedLogDirs())); + directoryEventHandler.handleCordoned(newConfig.cordonedLogDirs().stream() + .flatMap(dir -> logManager.directoryId(dir).stream()) + .collect(Collectors.toSet())); + } + + @Override + public String toString() { + return "DynamicLogConfig"; + } +} diff --git a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java index de9289e09a781..548b5c490f92c 100644 --- a/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java +++ b/server/src/main/java/org/apache/kafka/server/config/DynamicProducerStateManagerConfig.java @@ -27,7 +27,7 @@ import java.util.Set; -public class DynamicProducerStateManagerConfig implements BrokerReconfigurable { +public class DynamicProducerStateManagerConfig implements BrokerReconfigurable { private final Logger log = LoggerFactory.getLogger(DynamicProducerStateManagerConfig.class); private final ProducerStateManagerConfig producerStateManagerConfig; diff --git a/server/src/test/java/org/apache/kafka/server/config/DynamicLogConfigTest.java b/server/src/test/java/org/apache/kafka/server/config/DynamicLogConfigTest.java new file mode 100644 index 0000000000000..54f574c7db357 --- /dev/null +++ b/server/src/test/java/org/apache/kafka/server/config/DynamicLogConfigTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.kafka.common.Reconfigurable; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.raft.KRaftConfigs; +import org.apache.kafka.server.common.DirectoryEventHandler; +import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig; +import org.apache.kafka.storage.internals.log.LogManager; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.mock; + +public class DynamicLogConfigTest { + + private final DynamicLogConfig dynamicLogConfig = new DynamicLogConfig( + mock(LogManager.class), + mock(DirectoryEventHandler.class) + ); + + @Test + public void testValidateLogLocalRetentionMs() { + assertInvalidConfig( + Map.of( + ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "1000", + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "-1" + ), + "Invalid value -1 for configuration " + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP + + ": Value must not be -1 as " + ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG + + " value is set as 1000." + ); + + assertInvalidConfig( + Map.of( + ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG, "1000", + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, "1001" + ), + "Invalid value 1001 for configuration " + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP + + ": Value must not be more than " + ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG + + " property value: 1000" + ); + } + + @Test + public void testValidateLogLocalRetentionBytes() { + assertInvalidConfig( + Map.of( + ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "1000", + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "-1" + ), + "Invalid value -1 for configuration " + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP + + ": Value must not be -1 as " + ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG + + " value is set as 1000." + ); + + assertInvalidConfig( + Map.of( + ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, "1000", + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, "1001" + ), + "Invalid value 1001 for configuration " + RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP + + ": Value must not be more than " + ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG + + " property value: 1000" + ); + } + + private void assertInvalidConfig(Map overrides, String expectedMessage) { + ConfigException exception = assertThrows( + ConfigException.class, + () -> dynamicLogConfig.validateReconfiguration(kafkaConfig(overrides)) + ); + assertEquals(expectedMessage, exception.getMessage()); + } + + private static AbstractKafkaConfig kafkaConfig(Map overrides) { + Map props = new HashMap<>(); + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker"); + props.put(KRaftConfigs.NODE_ID_CONFIG, "1"); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER"); + props.putAll(overrides); + return new AbstractKafkaConfig(AbstractKafkaConfig.CONFIG_DEF, props, Map.of(), false) { + @Override + public void addReconfigurable(Reconfigurable reconfigurable) { + } + + @Override + public void removeReconfigurable(Reconfigurable reconfigurable) { + } + }; + } +} diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java index 5b962f8fe457c..0914e486b878e 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaner.java @@ -95,7 +95,7 @@ * tombstone deletion. * */ -public class LogCleaner implements BrokerReconfigurable { +public class LogCleaner implements BrokerReconfigurable { private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class); public static final Set RECONFIGURABLE_CONFIGS = Set.of( diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index c1a6361e50db1..1c61e213e11bc 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -142,6 +142,7 @@ public Optional serverConfigName(String configName) { public static final boolean DEFAULT_REMOTE_STORAGE_ENABLE = false; public static final boolean DEFAULT_REMOTE_LOG_COPY_DISABLE_CONFIG = false; public static final boolean DEFAULT_REMOTE_LOG_DELETE_ON_DISABLE_CONFIG = false; + public static final long NO_RETENTION_LIMIT = -1L; // It indicates no retention limit public static final long DEFAULT_LOCAL_RETENTION_BYTES = -2; // It indicates the value to be derived from RetentionBytes public static final long DEFAULT_LOCAL_RETENTION_MS = -2; // It indicates the value to be derived from RetentionMs public static final long DEFAULT_REMOTE_COPY_LAG_MS = 0; @@ -209,7 +210,7 @@ public Optional serverConfigName(String configName) { // can be negative. See kafka.log.LogManager.cleanupSegmentsToMaintainSize .define(TopicConfig.RETENTION_BYTES_CONFIG, LONG, ServerLogConfigs.LOG_RETENTION_BYTES_DEFAULT, MEDIUM, TopicConfig.RETENTION_BYTES_DOC) // can be negative. See kafka.log.LogManager.cleanupExpiredSegments - .define(TopicConfig.RETENTION_MS_CONFIG, LONG, DEFAULT_RETENTION_MS, atLeast(-1), MEDIUM, + .define(TopicConfig.RETENTION_MS_CONFIG, LONG, DEFAULT_RETENTION_MS, atLeast(NO_RETENTION_LIMIT), MEDIUM, TopicConfig.RETENTION_MS_DOC) .define(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, INT, ServerLogConfigs.MAX_MESSAGE_BYTES_DEFAULT, atLeast(0), MEDIUM, TopicConfig.MAX_MESSAGE_BYTES_DOC) @@ -257,8 +258,8 @@ public Optional serverConfigName(String configName) { .define(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, LONG, DEFAULT_LOCAL_RETENTION_BYTES, atLeast(-2), MEDIUM, TopicConfig.LOCAL_LOG_RETENTION_BYTES_DOC) .define(TopicConfig.REMOTE_LOG_COPY_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_COPY_DISABLE_DOC) - .define(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, LONG, DEFAULT_REMOTE_COPY_LAG_MS, atLeast(-1), MEDIUM, TopicConfig.REMOTE_COPY_LAG_MS_DOC) - .define(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, LONG, DEFAULT_REMOTE_COPY_LAG_BYTES, atLeast(-1), MEDIUM, TopicConfig.REMOTE_COPY_LAG_BYTES_DOC) + .define(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, LONG, DEFAULT_REMOTE_COPY_LAG_MS, atLeast(MAX_REMOTE_COPY_LAG_MS), MEDIUM, TopicConfig.REMOTE_COPY_LAG_MS_DOC) + .define(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, LONG, DEFAULT_REMOTE_COPY_LAG_BYTES, atLeast(MAX_REMOTE_COPY_LAG_BYTES), MEDIUM, TopicConfig.REMOTE_COPY_LAG_BYTES_DOC) .define(TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.REMOTE_LOG_DELETE_ON_DISABLE_DOC) .define(TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_CONFIG, BOOLEAN, false, MEDIUM, TopicConfig.ERRORS_DEADLETTERQUEUE_GROUP_ENABLE_DOC) .defineInternal(INTERNAL_SEGMENT_BYTES_CONFIG, INT, null, null, MEDIUM, INTERNAL_SEGMENT_BYTES_DOC); @@ -267,8 +268,8 @@ public Optional serverConfigName(String configName) { public final Set overriddenConfigs; /* - * Important note: Any configuration parameter that is passed along from KafkaConfig to LogConfig - * should also be in `KafkaConfig#extractLogConfigMap`. + * Important note: Any configuration parameter that is passed along from AbstractKafkaConfig to + * LogConfig should also be in `AbstractKafkaConfig#extractLogConfigMap`. */ private final int segmentSize; private final Integer internalSegmentSize; @@ -600,10 +601,10 @@ private static void validateRemoteStorageRequiresDeleteCleanupPolicy(Map props) { Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); Long localRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); - if (retentionBytes > -1 && localRetentionBytes != -2) { - if (localRetentionBytes == -1) { - String message = String.format("Value must not be -1 as %s value is set as %d.", - TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); + if (retentionBytes > NO_RETENTION_LIMIT && localRetentionBytes != DEFAULT_LOCAL_RETENTION_BYTES) { + if (localRetentionBytes == NO_RETENTION_LIMIT) { + String message = String.format("Value must not be %d as %s value is set as %d.", + NO_RETENTION_LIMIT, TopicConfig.RETENTION_BYTES_CONFIG, retentionBytes); throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, localRetentionBytes, message); } if (localRetentionBytes > retentionBytes) { @@ -617,10 +618,10 @@ private static void validateRemoteStorageRetentionSize(Map props) { private static void validateRemoteStorageRetentionTime(Map props) { Long retentionMs = (Long) props.get(TopicConfig.RETENTION_MS_CONFIG); Long localRetentionMs = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG); - if (retentionMs != -1 && localRetentionMs != -2) { - if (localRetentionMs == -1) { - String message = String.format("Value must not be -1 as %s value is set as %d.", - TopicConfig.RETENTION_MS_CONFIG, retentionMs); + if (retentionMs != NO_RETENTION_LIMIT && localRetentionMs != DEFAULT_LOCAL_RETENTION_MS) { + if (localRetentionMs == NO_RETENTION_LIMIT) { + String message = String.format("Value must not be %d as %s value is set as %d.", + NO_RETENTION_LIMIT, TopicConfig.RETENTION_MS_CONFIG, retentionMs); throw new ConfigException(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, localRetentionMs, message); } if (localRetentionMs > retentionMs) { @@ -635,7 +636,7 @@ private static void validateRemoteCopyLagTime(Map props) { Long retentionMs = (Long) props.get(TopicConfig.RETENTION_MS_CONFIG); Long localRetentionMs = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG); Long remoteCopyLagMs = (Long) props.get(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG); - long effectiveLocalRetentionMs = localRetentionMs == -2 ? retentionMs : localRetentionMs; + long effectiveLocalRetentionMs = localRetentionMs == DEFAULT_LOCAL_RETENTION_MS ? retentionMs : localRetentionMs; if (remoteCopyLagMs > 0 && effectiveLocalRetentionMs >= 0 && remoteCopyLagMs > effectiveLocalRetentionMs) { String message = String.format("Value must not exceed %s (effective value: %d)", @@ -648,7 +649,7 @@ private static void validateRemoteCopyLagSize(Map props) { Long retentionBytes = (Long) props.get(TopicConfig.RETENTION_BYTES_CONFIG); Long localRetentionBytes = (Long) props.get(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG); Long remoteCopyLagBytes = (Long) props.get(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG); - long effectiveLocalRetentionBytes = localRetentionBytes == -2 ? retentionBytes : localRetentionBytes; + long effectiveLocalRetentionBytes = localRetentionBytes == DEFAULT_LOCAL_RETENTION_BYTES ? retentionBytes : localRetentionBytes; if (remoteCopyLagBytes > 0 && effectiveLocalRetentionBytes >= 0 && remoteCopyLagBytes > effectiveLocalRetentionBytes) { String message = String.format("Value must not exceed %s (effective value: %d)",