Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 3 additions & 112 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.network.DataPlaneAcceptor
import kafka.raft.KafkaRaftManager
import kafka.server.DynamicBrokerConfig._
import kafka.utils.Logging
import org.apache.kafka.common.{Endpoint, Reconfigurable, Uuid}
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource, SslConfigs}
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
Expand All @@ -39,16 +39,14 @@ import org.apache.kafka.config
import org.apache.kafka.network.SocketServer
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler}
import org.apache.kafka.server.config.{DynamicConfig, DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig => JDynamicBrokerConfig}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.{DynamicConfig, DynamicLogConfig, DynamicProducerStateManagerConfig, ServerConfigs, DynamicBrokerConfig => JDynamicBrokerConfig}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider}
import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.apache.kafka.storage.internals.log.{LogConfig, LogManager}

import java.util.stream.Collectors
import scala.util.Using
import scala.collection._
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -538,113 +536,6 @@ trait BrokerReconfigurable {
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
}

class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryEventHandler) extends BrokerReconfigurable with Logging {

override def reconfigurableConfigs: util.Set[String] = {
JDynamicBrokerConfig.DynamicLogConfig.RECONFIGURABLE_CONFIGS
}

override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
// For update of topic config overrides, only config names and types are validated
// Names and types have already been validated. For consistency with topic config
// validation, no additional validation is performed.

def validateLogLocalRetentionMs(): Unit = {
val logRetentionMs = newConfig.logRetentionTimeMillis
val logLocalRetentionMs: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionMs
if (logRetentionMs != -1L && logLocalRetentionMs != -2L) {
if (logLocalRetentionMs == -1L) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} value is set as $logRetentionMs.")
}
if (logLocalRetentionMs > logRetentionMs) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} property value: $logRetentionMs")
}
}
}

def validateLogLocalRetentionBytes(): Unit = {
val logRetentionBytes = newConfig.logRetentionBytes
val logLocalRetentionBytes: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
if (logRetentionBytes > -1 && logLocalRetentionBytes != -2) {
if (logLocalRetentionBytes == -1) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} value is set as $logRetentionBytes.")
}
if (logLocalRetentionBytes > logRetentionBytes) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} property value: $logRetentionBytes")
}
}
}

def validateLogRemoteCopyLagMs(): Unit = {
val logRetentionMs: Long = newConfig.logRetentionTimeMillis
val logLocalRetentionMs = newConfig.remoteLogManagerConfig.logLocalRetentionMs
val effectiveLocalRetentionMs = if (logLocalRetentionMs == -2L) logRetentionMs else logLocalRetentionMs
val logRemoteCopyLagMs = newConfig.remoteLogManagerConfig.logRemoteCopyLagMs
if (logRemoteCopyLagMs > 0L && effectiveLocalRetentionMs >= 0L && logRemoteCopyLagMs > effectiveLocalRetentionMs) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP} (effective value: $effectiveLocalRetentionMs)")
}
}

def validateLogRemoteCopyLagBytes(): Unit = {
val logRetentionBytes: Long = newConfig.logRetentionBytes
val logLocalRetentionBytes = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
val effectiveLocalRetentionBytes = if (logLocalRetentionBytes == -2L) logRetentionBytes else logLocalRetentionBytes
val logRemoteCopyLagBytes = newConfig.remoteLogManagerConfig.logRemoteCopyLagBytes
if (logRemoteCopyLagBytes > 0L && effectiveLocalRetentionBytes >= 0L && logRemoteCopyLagBytes > effectiveLocalRetentionBytes) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP} (effective value: $effectiveLocalRetentionBytes)")
}
}

def validateCordonedLogDirs(): Unit = {
val logDirs = newConfig.logDirs()
val cordonedLogDirs = newConfig.cordonedLogDirs()
cordonedLogDirs.asScala.foreach(dir =>
if (!logDirs.contains(dir)) {
throw new ConfigException(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, cordonedLogDirs, s"Invalid entry in ${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG}: $dir. " +
s"All cordoned log dirs must be entries of ${ServerLogConfigs.LOG_DIRS_CONFIG} or ${ServerLogConfigs.LOG_DIR_CONFIG}.")
}
)
}

validateLogLocalRetentionMs()
validateLogLocalRetentionBytes()
validateLogRemoteCopyLagMs()
validateLogRemoteCopyLagBytes()
validateCordonedLogDirs()
}

private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit = {
logManager.brokerConfigUpdated()
logManager.allLogs.forEach { log =>
val props = mutable.Map.empty[Any, Any]
props ++= newBrokerDefaults
props ++= log.config.originals.asScala.filter { case (k, _) =>
log.config.overriddenConfigs.contains(k)
}

val logConfig = new LogConfig(props.asJava, log.config.overriddenConfigs)
log.updateConfig(logConfig)
}
}

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap)
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
updateLogsConfig(newBrokerDefaults.asScala)

logManager.updateCordonedLogDirs(util.Set.copyOf(newConfig.cordonedLogDirs))
directoryEventHandler.handleCordoned(newConfig.cordonedLogDirs.stream
.flatMap[Uuid](dir => logManager.directoryId(dir).stream)
.collect(Collectors.toSet[Uuid]))
}
}

class ControllerDynamicThreadPool(controller: ControllerServer) extends BrokerReconfigurable {

override def reconfigurableConfigs: util.Set[String] = {
Expand Down
40 changes: 1 addition & 39 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Properties
import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig}
import org.apache.kafka.common.config.{ConfigDef, ConfigException}
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.internals.Plugin
Expand Down Expand Up @@ -594,42 +594,4 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
}

/**
* Copy the subset of properties that are relevant to Logs. The individual properties
* are listed here since the names are slightly different in each Config class...
*/
def extractLogConfigMap: java.util.Map[String, Object] = {
val logProps = new java.util.HashMap[String, Object]()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes)
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, logRollTimeMillis)
logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, logRollTimeJitterMillis)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, logIndexSizeMaxBytes)
logProps.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, logFlushIntervalMessages)
logProps.put(TopicConfig.FLUSH_MS_CONFIG, logFlushIntervalMs)
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, logRetentionBytes)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, logRetentionTimeMillis: java.lang.Long)
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, messageMaxBytes)
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, logIndexIntervalBytes)
logProps.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, logCleanerDeleteRetentionMs)
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, logCleanerMinCompactionLagMs)
logProps.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, logCleanerMaxCompactionLagMs)
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, logDeleteDelayMs)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, logCleanerMinCleanRatio)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, logCleanupPolicy)
logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, minInSyncReplicas)
logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, compressionType)
logProps.put(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, gzipCompressionLevel)
logProps.put(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, lz4CompressionLevel)
logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, zstdCompressionLevel)
logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, uncleanLeaderElectionEnable)
logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType.name)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, remoteLogManagerConfig.logRemoteCopyLagMs: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, remoteLogManagerConfig.logRemoteCopyLagBytes: java.lang.Long)
logProps
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.network.{SocketServerConfigs, SocketServer => JSocketSer
import org.apache.kafka.server.DynamicThreadPool
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.DirectoryEventHandler
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{DynamicLogConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryContext, ClientTelemetryExporter, ClientTelemetryExporterProvider, ClientTelemetryPayload, ClientTelemetryReceiver}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.ListenerName;
Expand Down Expand Up @@ -640,6 +641,45 @@ public Long logRetentionTimeMillis() {
return millis < 0 ? Long.valueOf(-1) : millis;
}

/**
* Copy the subset of properties that are relevant to logs. The individual properties
* are listed here since the names are slightly different in each config class.
*/
public Map<String, Object> extractLogConfigMap() {
Map<String, Object> logProps = new HashMap<>();
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes());
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, logRollTimeMillis());
logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, logRollTimeJitterMillis());
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, logIndexSizeMaxBytes());
logProps.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, logFlushIntervalMessages());
logProps.put(TopicConfig.FLUSH_MS_CONFIG, logFlushIntervalMs());
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, logRetentionBytes());
logProps.put(TopicConfig.RETENTION_MS_CONFIG, logRetentionTimeMillis());
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG));
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, logIndexIntervalBytes());
logProps.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, logCleanerDeleteRetentionMs());
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, logCleanerMinCompactionLagMs());
logProps.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, logCleanerMaxCompactionLagMs());
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, logDeleteDelayMs());
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, logCleanerMinCleanRatio());
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, logCleanupPolicy());
logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, minInSyncReplicas());
logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, getString(ServerConfigs.COMPRESSION_TYPE_CONFIG));
logProps.put(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG));
logProps.put(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG));
logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG));
logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG));
logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable());
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType().name);
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs());
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs());
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP));
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP));
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP));
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP));
return logProps;
}

/**
* Returns a map of group config names to their broker-level synonym values, used as
* defaults when building a {@link GroupConfig} for {@code DescribeConfigs}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,6 @@ public static Map<String, String> dynamicConfigUpdateModes() {
);
}

public static class DynamicLogConfig {
/**
* The broker configurations pertaining to logs that are reconfigurable. This set contains
* the names you would use when setting a static or dynamic broker configuration (not topic
* configuration).
*/
public static final Set<String> RECONFIGURABLE_CONFIGS = Stream.of(
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values(),
Set.of(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG))
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());
}

public static class DynamicListenerConfig {
/**
* The set of configurations which the DynamicListenerConfig object listens for. Many of
Expand Down
Loading
Loading