Skip to content

Commit 16d254e

Browse files
committed
MINOR: Move DynamicLogConfig to server module
1 parent 6112e3f commit 16d254e

6 files changed

Lines changed: 219 additions & 165 deletions

File tree

core/src/main/scala/kafka/server/DynamicBrokerConfig.scala

Lines changed: 3 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import kafka.network.DataPlaneAcceptor
2525
import kafka.raft.KafkaRaftManager
2626
import kafka.server.DynamicBrokerConfig._
2727
import kafka.utils.Logging
28-
import org.apache.kafka.common.{Endpoint, Reconfigurable, Uuid}
28+
import org.apache.kafka.common.{Endpoint, Reconfigurable}
2929
import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource, SslConfigs}
3030
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
3131
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
@@ -39,16 +39,14 @@ import org.apache.kafka.config
3939
import org.apache.kafka.network.SocketServer
4040
import org.apache.kafka.raft.KafkaRaftClient
4141
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
42-
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler}
43-
import org.apache.kafka.server.config.{DynamicConfig, DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig => JDynamicBrokerConfig}
42+
import org.apache.kafka.server.common.ApiMessageAndVersion
43+
import org.apache.kafka.server.config.{DynamicConfig, DynamicLogConfig, DynamicProducerStateManagerConfig, ServerConfigs, DynamicBrokerConfig => JDynamicBrokerConfig}
4444
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
4545
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs}
4646
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider}
4747
import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
4848
import org.apache.kafka.snapshot.RecordsSnapshotReader
49-
import org.apache.kafka.storage.internals.log.{LogConfig, LogManager}
5049

51-
import java.util.stream.Collectors
5250
import scala.util.Using
5351
import scala.collection._
5452
import scala.jdk.CollectionConverters._
@@ -538,113 +536,6 @@ trait BrokerReconfigurable {
538536
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
539537
}
540538

541-
class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryEventHandler) extends BrokerReconfigurable with Logging {
542-
543-
override def reconfigurableConfigs: util.Set[String] = {
544-
JDynamicBrokerConfig.DynamicLogConfig.RECONFIGURABLE_CONFIGS
545-
}
546-
547-
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
548-
// For update of topic config overrides, only config names and types are validated
549-
// Names and types have already been validated. For consistency with topic config
550-
// validation, no additional validation is performed.
551-
552-
def validateLogLocalRetentionMs(): Unit = {
553-
val logRetentionMs = newConfig.logRetentionTimeMillis
554-
val logLocalRetentionMs: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionMs
555-
if (logRetentionMs != -1L && logLocalRetentionMs != -2L) {
556-
if (logLocalRetentionMs == -1L) {
557-
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
558-
s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} value is set as $logRetentionMs.")
559-
}
560-
if (logLocalRetentionMs > logRetentionMs) {
561-
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
562-
s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} property value: $logRetentionMs")
563-
}
564-
}
565-
}
566-
567-
def validateLogLocalRetentionBytes(): Unit = {
568-
val logRetentionBytes = newConfig.logRetentionBytes
569-
val logLocalRetentionBytes: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
570-
if (logRetentionBytes > -1 && logLocalRetentionBytes != -2) {
571-
if (logLocalRetentionBytes == -1) {
572-
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
573-
s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} value is set as $logRetentionBytes.")
574-
}
575-
if (logLocalRetentionBytes > logRetentionBytes) {
576-
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
577-
s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} property value: $logRetentionBytes")
578-
}
579-
}
580-
}
581-
582-
def validateLogRemoteCopyLagMs(): Unit = {
583-
val logRetentionMs: Long = newConfig.logRetentionTimeMillis
584-
val logLocalRetentionMs = newConfig.remoteLogManagerConfig.logLocalRetentionMs
585-
val effectiveLocalRetentionMs = if (logLocalRetentionMs == -2L) logRetentionMs else logLocalRetentionMs
586-
val logRemoteCopyLagMs = newConfig.remoteLogManagerConfig.logRemoteCopyLagMs
587-
if (logRemoteCopyLagMs > 0L && effectiveLocalRetentionMs >= 0L && logRemoteCopyLagMs > effectiveLocalRetentionMs) {
588-
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs,
589-
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP} (effective value: $effectiveLocalRetentionMs)")
590-
}
591-
}
592-
593-
def validateLogRemoteCopyLagBytes(): Unit = {
594-
val logRetentionBytes: Long = newConfig.logRetentionBytes
595-
val logLocalRetentionBytes = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
596-
val effectiveLocalRetentionBytes = if (logLocalRetentionBytes == -2L) logRetentionBytes else logLocalRetentionBytes
597-
val logRemoteCopyLagBytes = newConfig.remoteLogManagerConfig.logRemoteCopyLagBytes
598-
if (logRemoteCopyLagBytes > 0L && effectiveLocalRetentionBytes >= 0L && logRemoteCopyLagBytes > effectiveLocalRetentionBytes) {
599-
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes,
600-
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP} (effective value: $effectiveLocalRetentionBytes)")
601-
}
602-
}
603-
604-
def validateCordonedLogDirs(): Unit = {
605-
val logDirs = newConfig.logDirs()
606-
val cordonedLogDirs = newConfig.cordonedLogDirs()
607-
cordonedLogDirs.asScala.foreach(dir =>
608-
if (!logDirs.contains(dir)) {
609-
throw new ConfigException(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, cordonedLogDirs, s"Invalid entry in ${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG}: $dir. " +
610-
s"All cordoned log dirs must be entries of ${ServerLogConfigs.LOG_DIRS_CONFIG} or ${ServerLogConfigs.LOG_DIR_CONFIG}.")
611-
}
612-
)
613-
}
614-
615-
validateLogLocalRetentionMs()
616-
validateLogLocalRetentionBytes()
617-
validateLogRemoteCopyLagMs()
618-
validateLogRemoteCopyLagBytes()
619-
validateCordonedLogDirs()
620-
}
621-
622-
private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit = {
623-
logManager.brokerConfigUpdated()
624-
logManager.allLogs.forEach { log =>
625-
val props = mutable.Map.empty[Any, Any]
626-
props ++= newBrokerDefaults
627-
props ++= log.config.originals.asScala.filter { case (k, _) =>
628-
log.config.overriddenConfigs.contains(k)
629-
}
630-
631-
val logConfig = new LogConfig(props.asJava, log.config.overriddenConfigs)
632-
log.updateConfig(logConfig)
633-
}
634-
}
635-
636-
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
637-
val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap)
638-
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
639-
updateLogsConfig(newBrokerDefaults.asScala)
640-
641-
logManager.updateCordonedLogDirs(util.Set.copyOf(newConfig.cordonedLogDirs))
642-
directoryEventHandler.handleCordoned(newConfig.cordonedLogDirs.stream
643-
.flatMap[Uuid](dir => logManager.directoryId(dir).stream)
644-
.collect(Collectors.toSet[Uuid]))
645-
}
646-
}
647-
648539
class ControllerDynamicThreadPool(controller: ControllerServer) extends BrokerReconfigurable {
649540

650541
override def reconfigurableConfigs: util.Set[String] = {

core/src/main/scala/kafka/server/KafkaConfig.scala

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.Properties
2323
import kafka.utils.Logging
2424
import kafka.utils.Implicits._
2525
import org.apache.kafka.common.{Endpoint, Reconfigurable}
26-
import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig}
26+
import org.apache.kafka.common.config.{ConfigDef, ConfigException}
2727
import org.apache.kafka.common.config.ConfigDef.ConfigKey
2828
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
2929
import org.apache.kafka.common.internals.Plugin
@@ -594,42 +594,4 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
594594
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
595595
}
596596

597-
/**
598-
* Copy the subset of properties that are relevant to Logs. The individual properties
599-
* are listed here since the names are slightly different in each Config class...
600-
*/
601-
def extractLogConfigMap: java.util.Map[String, Object] = {
602-
val logProps = new java.util.HashMap[String, Object]()
603-
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes)
604-
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, logRollTimeMillis)
605-
logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, logRollTimeJitterMillis)
606-
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, logIndexSizeMaxBytes)
607-
logProps.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, logFlushIntervalMessages)
608-
logProps.put(TopicConfig.FLUSH_MS_CONFIG, logFlushIntervalMs)
609-
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, logRetentionBytes)
610-
logProps.put(TopicConfig.RETENTION_MS_CONFIG, logRetentionTimeMillis: java.lang.Long)
611-
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, messageMaxBytes)
612-
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, logIndexIntervalBytes)
613-
logProps.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, logCleanerDeleteRetentionMs)
614-
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, logCleanerMinCompactionLagMs)
615-
logProps.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, logCleanerMaxCompactionLagMs)
616-
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, logDeleteDelayMs)
617-
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, logCleanerMinCleanRatio)
618-
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, logCleanupPolicy)
619-
logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, minInSyncReplicas)
620-
logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, compressionType)
621-
logProps.put(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, gzipCompressionLevel)
622-
logProps.put(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, lz4CompressionLevel)
623-
logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, zstdCompressionLevel)
624-
logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, uncleanLeaderElectionEnable)
625-
logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable)
626-
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType.name)
627-
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long)
628-
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
629-
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
630-
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)
631-
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, remoteLogManagerConfig.logRemoteCopyLagMs: java.lang.Long)
632-
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, remoteLogManagerConfig.logRemoteCopyLagBytes: java.lang.Long)
633-
logProps
634-
}
635597
}

core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ import org.apache.kafka.network.{SocketServerConfigs, SocketServer => JSocketSer
3737
import org.apache.kafka.server.DynamicThreadPool
3838
import org.apache.kafka.server.authorizer._
3939
import org.apache.kafka.server.common.DirectoryEventHandler
40-
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
40+
import org.apache.kafka.server.config.{DynamicLogConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
4141
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
4242
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics, MetricConfigs}
4343
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryContext, ClientTelemetryExporter, ClientTelemetryExporterProvider, ClientTelemetryPayload, ClientTelemetryReceiver}

server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.common.config.ConfigDef;
2424
import org.apache.kafka.common.config.ConfigException;
2525
import org.apache.kafka.common.config.ConfigResource;
26+
import org.apache.kafka.common.config.TopicConfig;
2627
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
2728
import org.apache.kafka.common.config.types.Password;
2829
import org.apache.kafka.common.network.ListenerName;
@@ -636,6 +637,45 @@ public Long logRetentionTimeMillis() {
636637
return millis < 0 ? Long.valueOf(-1) : millis;
637638
}
638639

640+
/**
641+
* Copy the subset of properties that are relevant to logs. The individual properties
642+
* are listed here since the names are slightly different in each config class.
643+
*/
644+
public Map<String, Object> extractLogConfigMap() {
645+
Map<String, Object> logProps = new HashMap<>();
646+
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes());
647+
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, logRollTimeMillis());
648+
logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, logRollTimeJitterMillis());
649+
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, logIndexSizeMaxBytes());
650+
logProps.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, logFlushIntervalMessages());
651+
logProps.put(TopicConfig.FLUSH_MS_CONFIG, logFlushIntervalMs());
652+
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, logRetentionBytes());
653+
logProps.put(TopicConfig.RETENTION_MS_CONFIG, logRetentionTimeMillis());
654+
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG));
655+
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, logIndexIntervalBytes());
656+
logProps.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, logCleanerDeleteRetentionMs());
657+
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, logCleanerMinCompactionLagMs());
658+
logProps.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, logCleanerMaxCompactionLagMs());
659+
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, logDeleteDelayMs());
660+
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, logCleanerMinCleanRatio());
661+
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, logCleanupPolicy());
662+
logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, minInSyncReplicas());
663+
logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, getString(ServerConfigs.COMPRESSION_TYPE_CONFIG));
664+
logProps.put(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG));
665+
logProps.put(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG));
666+
logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG));
667+
logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG));
668+
logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable());
669+
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType().name);
670+
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs());
671+
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs());
672+
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP));
673+
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP));
674+
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP));
675+
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP));
676+
return logProps;
677+
}
678+
639679
/**
640680
* Returns a map of group config names to their broker-level synonym values, used as
641681
* defaults when building a {@link GroupConfig} for {@code DescribeConfigs}.

server/src/main/java/org/apache/kafka/server/config/DynamicBrokerConfig.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -200,19 +200,6 @@ public static Map<String, String> dynamicConfigUpdateModes() {
200200
);
201201
}
202202

203-
public static class DynamicLogConfig {
204-
/**
205-
* The broker configurations pertaining to logs that are reconfigurable. This set contains
206-
* the names you would use when setting a static or dynamic broker configuration (not topic
207-
* configuration).
208-
*/
209-
public static final Set<String> RECONFIGURABLE_CONFIGS = Stream.of(
210-
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values(),
211-
Set.of(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG))
212-
.flatMap(Collection::stream)
213-
.collect(Collectors.toUnmodifiableSet());
214-
}
215-
216203
public static class DynamicListenerConfig {
217204
/**
218205
* The set of configurations which the DynamicListenerConfig object listens for. Many of

0 commit comments

Comments
 (0)