Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 4 additions & 113 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.network.DataPlaneAcceptor
import kafka.raft.KafkaRaftManager
import kafka.server.DynamicBrokerConfig._
import kafka.utils.Logging
import org.apache.kafka.common.{Endpoint, Reconfigurable, Uuid}
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException, ConfigResource, SslConfigs}
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
import org.apache.kafka.common.metrics.{Metrics, MetricsReporter}
Expand All @@ -39,16 +39,14 @@ import org.apache.kafka.config
import org.apache.kafka.network.SocketServer
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler}
import org.apache.kafka.server.config.{DynamicConfig, DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, DynamicBrokerConfig => JDynamicBrokerConfig}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.{DynamicConfig, DynamicLogConfig, DynamicProducerStateManagerConfig, ServerConfigs, DynamicBrokerConfig => JDynamicBrokerConfig}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider}
import org.apache.kafka.server.util.LockUtils.{inReadLock, inWriteLock}
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.apache.kafka.storage.internals.log.{LogConfig, LogManager}

import java.util.stream.Collectors
import scala.util.Using
import scala.collection._
import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -231,7 +229,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
reconfigurables.add(reconfigurable)
}

def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable): Unit = {
def addBrokerReconfigurable(reconfigurable: config.BrokerReconfigurable[_ >: KafkaConfig]): Unit = {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
brokerReconfigurables.add(new BrokerReconfigurable {
override def reconfigurableConfigs: util.Set[String] = reconfigurable.reconfigurableConfigs
Expand Down Expand Up @@ -538,113 +536,6 @@ trait BrokerReconfigurable {
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
}

class DynamicLogConfig(logManager: LogManager, directoryEventHandler: DirectoryEventHandler) extends BrokerReconfigurable with Logging {

override def reconfigurableConfigs: util.Set[String] = {
JDynamicBrokerConfig.DynamicLogConfig.RECONFIGURABLE_CONFIGS
}

override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
// For update of topic config overrides, only config names and types are validated
// Names and types have already been validated. For consistency with topic config
// validation, no additional validation is performed.

def validateLogLocalRetentionMs(): Unit = {
val logRetentionMs = newConfig.logRetentionTimeMillis
val logLocalRetentionMs: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionMs
if (logRetentionMs != -1L && logLocalRetentionMs != -2L) {
if (logLocalRetentionMs == -1L) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} value is set as $logRetentionMs.")
}
if (logLocalRetentionMs > logRetentionMs) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_TIME_MILLIS_CONFIG} property value: $logRetentionMs")
}
}
}

def validateLogLocalRetentionBytes(): Unit = {
val logRetentionBytes = newConfig.logRetentionBytes
val logLocalRetentionBytes: java.lang.Long = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
if (logRetentionBytes > -1 && logLocalRetentionBytes != -2) {
if (logLocalRetentionBytes == -1) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
s"Value must not be -1 as ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} value is set as $logRetentionBytes.")
}
if (logLocalRetentionBytes > logRetentionBytes) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
s"Value must not be more than ${ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG} property value: $logRetentionBytes")
}
}
}

def validateLogRemoteCopyLagMs(): Unit = {
val logRetentionMs: Long = newConfig.logRetentionTimeMillis
val logLocalRetentionMs = newConfig.remoteLogManagerConfig.logLocalRetentionMs
val effectiveLocalRetentionMs = if (logLocalRetentionMs == -2L) logRetentionMs else logLocalRetentionMs
val logRemoteCopyLagMs = newConfig.remoteLogManagerConfig.logRemoteCopyLagMs
if (logRemoteCopyLagMs > 0L && effectiveLocalRetentionMs >= 0L && logRemoteCopyLagMs > effectiveLocalRetentionMs) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP, logRemoteCopyLagMs,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP} (effective value: $effectiveLocalRetentionMs)")
}
}

def validateLogRemoteCopyLagBytes(): Unit = {
val logRetentionBytes: Long = newConfig.logRetentionBytes
val logLocalRetentionBytes = newConfig.remoteLogManagerConfig.logLocalRetentionBytes
val effectiveLocalRetentionBytes = if (logLocalRetentionBytes == -2L) logRetentionBytes else logLocalRetentionBytes
val logRemoteCopyLagBytes = newConfig.remoteLogManagerConfig.logRemoteCopyLagBytes
if (logRemoteCopyLagBytes > 0L && effectiveLocalRetentionBytes >= 0L && logRemoteCopyLagBytes > effectiveLocalRetentionBytes) {
throw new ConfigException(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP, logRemoteCopyLagBytes,
s"Value must not exceed ${RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP} (effective value: $effectiveLocalRetentionBytes)")
}
}

def validateCordonedLogDirs(): Unit = {
val logDirs = newConfig.logDirs()
val cordonedLogDirs = newConfig.cordonedLogDirs()
cordonedLogDirs.asScala.foreach(dir =>
if (!logDirs.contains(dir)) {
throw new ConfigException(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG, cordonedLogDirs, s"Invalid entry in ${ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG}: $dir. " +
s"All cordoned log dirs must be entries of ${ServerLogConfigs.LOG_DIRS_CONFIG} or ${ServerLogConfigs.LOG_DIR_CONFIG}.")
}
)
}

validateLogLocalRetentionMs()
validateLogLocalRetentionBytes()
validateLogRemoteCopyLagMs()
validateLogRemoteCopyLagBytes()
validateCordonedLogDirs()
}

private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit = {
logManager.brokerConfigUpdated()
logManager.allLogs.forEach { log =>
val props = mutable.Map.empty[Any, Any]
props ++= newBrokerDefaults
props ++= log.config.originals.asScala.filter { case (k, _) =>
log.config.overriddenConfigs.contains(k)
}

val logConfig = new LogConfig(props.asJava, log.config.overriddenConfigs)
log.updateConfig(logConfig)
}
}

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val newBrokerDefaults = new util.HashMap[String, Object](newConfig.extractLogConfigMap)
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
updateLogsConfig(newBrokerDefaults.asScala)

logManager.updateCordonedLogDirs(util.Set.copyOf(newConfig.cordonedLogDirs))
directoryEventHandler.handleCordoned(newConfig.cordonedLogDirs.stream
.flatMap[Uuid](dir => logManager.directoryId(dir).stream)
.collect(Collectors.toSet[Uuid]))
}
}

class ControllerDynamicThreadPool(controller: ControllerServer) extends BrokerReconfigurable {

override def reconfigurableConfigs: util.Set[String] = {
Expand Down
40 changes: 1 addition & 39 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.Properties
import kafka.utils.Logging
import kafka.utils.Implicits._
import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException, TopicConfig}
import org.apache.kafka.common.config.{ConfigDef, ConfigException}
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.internals.Plugin
Expand Down Expand Up @@ -594,42 +594,4 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
s"${BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG} must implement KafkaPrincipalSerde")
}

/**
* Copy the subset of properties that are relevant to Logs. The individual properties
* are listed here since the names are slightly different in each Config class...
*/
def extractLogConfigMap: java.util.Map[String, Object] = {
val logProps = new java.util.HashMap[String, Object]()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes)
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, logRollTimeMillis)
logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, logRollTimeJitterMillis)
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, logIndexSizeMaxBytes)
logProps.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, logFlushIntervalMessages)
logProps.put(TopicConfig.FLUSH_MS_CONFIG, logFlushIntervalMs)
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, logRetentionBytes)
logProps.put(TopicConfig.RETENTION_MS_CONFIG, logRetentionTimeMillis: java.lang.Long)
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, messageMaxBytes)
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, logIndexIntervalBytes)
logProps.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, logCleanerDeleteRetentionMs)
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, logCleanerMinCompactionLagMs)
logProps.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, logCleanerMaxCompactionLagMs)
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, logDeleteDelayMs)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, logCleanerMinCleanRatio)
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, logCleanupPolicy)
logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, minInSyncReplicas)
logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, compressionType)
logProps.put(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, gzipCompressionLevel)
logProps.put(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, lz4CompressionLevel)
logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, zstdCompressionLevel)
logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, uncleanLeaderElectionEnable)
logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType.name)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs: java.lang.Long)
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, remoteLogManagerConfig.logLocalRetentionMs: java.lang.Long)
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, remoteLogManagerConfig.logLocalRetentionBytes: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, remoteLogManagerConfig.logRemoteCopyLagMs: java.lang.Long)
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, remoteLogManagerConfig.logRemoteCopyLagBytes: java.lang.Long)
logProps
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.kafka.network.{SocketServerConfigs, SocketServer => JSocketSer
import org.apache.kafka.server.DynamicThreadPool
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.DirectoryEventHandler
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.config.{DynamicLogConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryContext, ClientTelemetryExporter, ClientTelemetryExporterProvider, ClientTelemetryPayload, ClientTelemetryReceiver}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
* <li>Validating the new configuration before applying it via {@link #validateReconfiguration(AbstractConfig)}</li>
* <li>Applying the new configuration via {@link #reconfigure(AbstractConfig, AbstractConfig)}</li>
* </ol>
*
* @param <T> the configuration type used by the reconfigurable component
*/
public interface BrokerReconfigurable {
public interface BrokerReconfigurable<T extends AbstractConfig> {
/**
* Returns the set of configuration keys that can be dynamically reconfigured.
*
Expand All @@ -53,7 +55,7 @@ public interface BrokerReconfigurable {
*
* @param newConfig the new configuration to validate
*/
void validateReconfiguration(AbstractConfig newConfig);
void validateReconfiguration(T newConfig);

/**
* Applies the new configuration.
Expand All @@ -63,5 +65,5 @@ public interface BrokerReconfigurable {
* @param oldConfig the previous configuration
* @param newConfig the new configuration to apply
*/
void reconfigure(AbstractConfig oldConfig, AbstractConfig newConfig);
void reconfigure(T oldConfig, T newConfig);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.network.ListenerName;
Expand Down Expand Up @@ -640,6 +641,45 @@ public Long logRetentionTimeMillis() {
return millis < 0 ? Long.valueOf(-1) : millis;
}

/**
* Copy the subset of properties that are relevant to logs. The individual properties
* are listed here since the names are slightly different in each config class.
*/
public Map<String, Object> extractLogConfigMap() {
Map<String, Object> logProps = new HashMap<>();
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, logSegmentBytes());
logProps.put(TopicConfig.SEGMENT_MS_CONFIG, logRollTimeMillis());
logProps.put(TopicConfig.SEGMENT_JITTER_MS_CONFIG, logRollTimeJitterMillis());
logProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, logIndexSizeMaxBytes());
logProps.put(TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG, logFlushIntervalMessages());
logProps.put(TopicConfig.FLUSH_MS_CONFIG, logFlushIntervalMs());
logProps.put(TopicConfig.RETENTION_BYTES_CONFIG, logRetentionBytes());
logProps.put(TopicConfig.RETENTION_MS_CONFIG, logRetentionTimeMillis());
logProps.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, getInt(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG));
logProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, logIndexIntervalBytes());
logProps.put(TopicConfig.DELETE_RETENTION_MS_CONFIG, logCleanerDeleteRetentionMs());
logProps.put(TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG, logCleanerMinCompactionLagMs());
logProps.put(TopicConfig.MAX_COMPACTION_LAG_MS_CONFIG, logCleanerMaxCompactionLagMs());
logProps.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, logDeleteDelayMs());
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, logCleanerMinCleanRatio());
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, logCleanupPolicy());
logProps.put(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, minInSyncReplicas());
logProps.put(TopicConfig.COMPRESSION_TYPE_CONFIG, getString(ServerConfigs.COMPRESSION_TYPE_CONFIG));
logProps.put(TopicConfig.COMPRESSION_GZIP_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_GZIP_LEVEL_CONFIG));
logProps.put(TopicConfig.COMPRESSION_LZ4_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_LZ4_LEVEL_CONFIG));
logProps.put(TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG, getInt(ServerConfigs.COMPRESSION_ZSTD_LEVEL_CONFIG));
logProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG));
logProps.put(TopicConfig.PREALLOCATE_CONFIG, logPreAllocateEnable());
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG, logMessageTimestampType().name);
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, logMessageTimestampBeforeMaxMs());
logProps.put(TopicConfig.MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, logMessageTimestampAfterMaxMs());
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_MS_CONFIG, getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP));
logProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, getLong(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP));
logProps.put(TopicConfig.REMOTE_COPY_LAG_MS_CONFIG, getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_MS_PROP));
logProps.put(TopicConfig.REMOTE_COPY_LAG_BYTES_CONFIG, getLong(RemoteLogManagerConfig.LOG_REMOTE_COPY_LAG_BYTES_PROP));
return logProps;
}

/**
* Returns a map of group config names to their broker-level synonym values, used as
* defaults when building a {@link GroupConfig} for {@code DescribeConfigs}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,19 +200,6 @@ public static Map<String, String> dynamicConfigUpdateModes() {
);
}

public static class DynamicLogConfig {
/**
* The broker configurations pertaining to logs that are reconfigurable. This set contains
* the names you would use when setting a static or dynamic broker configuration (not topic
* configuration).
*/
public static final Set<String> RECONFIGURABLE_CONFIGS = Stream.of(
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values(),
Set.of(ServerLogConfigs.CORDONED_LOG_DIRS_CONFIG))
.flatMap(Collection::stream)
.collect(Collectors.toUnmodifiableSet());
}

public static class DynamicListenerConfig {
/**
* The set of configurations which the DynamicListenerConfig object listens for. Many of
Expand Down
Loading
Loading