Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import org.apache.kafka.common.config.ConfigDef.Range;

import java.util.Set;

public class SaslConfigs {

private static final String OAUTHBEARER_NOTE = " Currently applies only to OAUTHBEARER.";
Expand Down Expand Up @@ -196,6 +198,20 @@ public class SaslConfigs {
public static final boolean DEFAULT_SASL_OAUTHBEARER_HEADER_URLENCODE = false;
public static final String SASL_OAUTHBEARER_HEADER_URLENCODE_DOC = "The (optional) setting to enable the OAuth client to URL-encode the client_id and client_secret in the authorization header"
+ " in accordance with RFC6749, see <a href=\"https://datatracker.ietf.org/doc/html/rfc6749#section-2.3.1\">here</a> for more details. The default value is set to 'false' for backward compatibility";

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SaslConfigs.java is public, so maybe we can move this set to DynamicConfig?

SaslConfigs.SASL_JAAS_CONFIG,
SaslConfigs.SASL_KERBEROS_SERVICE_NAME,
SaslConfigs.SASL_KERBEROS_KINIT_CMD,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR,
SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER,
SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN,
SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR,
SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER,
SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS,
SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS
);

public static void addClientSaslSupport(ConfigDef config) {
config.define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC)
.define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,24 @@ public static void addClientSslSupport(ConfigDef config) {
SslConfigs.SSL_KEYSTORE_CERTIFICATE_CHAIN_CONFIG,
SslConfigs.SSL_KEYSTORE_KEY_CONFIG,
SslConfigs.SSL_TRUSTSTORE_CERTIFICATES_CONFIG);

public static final Set<String> DYNAMIC_LISTENER_CONFIGS = Set.of(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

SslConfigs.SSL_PROTOCOL_CONFIG,
SslConfigs.SSL_PROVIDER_CONFIG,
SslConfigs.SSL_CIPHER_SUITES_CONFIG,
SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
SslConfigs.SSL_KEY_PASSWORD_CONFIG,
SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG,
SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG);

public static final Set<String> NON_RECONFIGURABLE_CONFIGS = Set.of(
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Set;

import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
Expand Down Expand Up @@ -198,4 +199,11 @@ public class BrokerSecurityConfigs {
.define(SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, INT, SaslConfigs.DEFAULT_SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS, LOW, SaslConfigs.SASL_OAUTHBEARER_CLOCK_SKEW_SECONDS_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE, LIST, null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_AUDIENCE_DOC)
.define(SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER, STRING, null, LOW, SaslConfigs.SASL_OAUTHBEARER_EXPECTED_ISSUER_DOC);

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
BrokerSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -177,6 +178,15 @@ public class SocketServerConfigs {

public static final Map<ListenerName, SecurityProtocol> DEFAULT_NAME_TO_SECURITY_PROTO;

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
SocketServerConfigs.MAX_CONNECTIONS_PER_IP_CONFIG,
SocketServerConfigs.MAX_CONNECTIONS_PER_IP_OVERRIDES_CONFIG,
SocketServerConfigs.MAX_CONNECTIONS_CONFIG,
SocketServerConfigs.MAX_CONNECTION_CREATION_RATE_CONFIG,
SocketServerConfigs.LISTENERS_CONFIG,
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG);

static {
HashMap<ListenerName, SecurityProtocol> nameToSecurityProtocol = new HashMap<>();
for (SecurityProtocol securityProtocol : SecurityProtocol.values()) {
Expand Down
206 changes: 206 additions & 0 deletions server/src/main/java/org/apache/kafka/server/config/DynamicConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.server.config;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupConfig;
import org.apache.kafka.coordinator.transaction.TransactionLogConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.metrics.MetricConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;

import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;

/**
* Class used to hold dynamic configs. These are configs which have no physical manifestation in the server.properties
* and can only be set dynamically.
*/
public class DynamicConfig {

public static final Set<String> ALL_DYNAMIC_CONFIGS;

static {
Set<String> allDynamicConfigs = new HashSet<>(SslConfigs.RECONFIGURABLE_CONFIGS);
allDynamicConfigs.addAll(SslConfigs.DYNAMIC_LISTENER_CONFIGS);
allDynamicConfigs.addAll(CleanerConfig.RECONFIGURABLE_CONFIGS);
allDynamicConfigs.addAll(ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values());
allDynamicConfigs.add(ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG);
allDynamicConfigs.add(ServerConfigs.MESSAGE_MAX_BYTES_CONFIG);
allDynamicConfigs.add(ServerConfigs.NUM_IO_THREADS_CONFIG);
allDynamicConfigs.add(ServerConfigs.BACKGROUND_THREADS_CONFIG);
allDynamicConfigs.add(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG);
allDynamicConfigs.addAll(MetricConfigs.RECONFIGURABLE_CONFIGS);
allDynamicConfigs.addAll(BrokerSecurityConfigs.RECONFIGURABLE_CONFIGS);
allDynamicConfigs.addAll(SocketServerConfigs.RECONFIGURABLE_CONFIGS);
allDynamicConfigs.addAll(SaslConfigs.RECONFIGURABLE_CONFIGS);
allDynamicConfigs.addAll(TransactionLogConfig.RECONFIGURABLE_CONFIGS);
allDynamicConfigs.addAll(RemoteLogManagerConfig.RECONFIGURABLE_CONFIGS);
ALL_DYNAMIC_CONFIGS = Collections.unmodifiableSet(allDynamicConfigs);
}


public static class Broker {
private static final ConfigDef BROKER_CONFIGS;

static {
ConfigDef configs = QuotaConfig.brokerQuotaConfigs();

// Filter and define all dynamic configurations
for (Map.Entry<String, ConfigDef.ConfigKey> entry :
AbstractKafkaConfig.CONFIG_DEF.configKeys().entrySet()) {
String configName = entry.getKey();
if (ALL_DYNAMIC_CONFIGS.contains(configName)) {
configs.define(entry.getValue());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

            AbstractKafkaConfig.CONFIG_DEF.configKeys().forEach((configName, value) -> {
                if (ALL_DYNAMIC_CONFIGS.contains(configName)) {
                    configs.define(value);
                }
            });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right

BROKER_CONFIGS = configs;
}

// In order to avoid circular reference, all DynamicBrokerConfig's variables which are initialized by
// `DynamicConfig.Broker` should be moved to `DynamicConfig.Broker`.
// Otherwise, those variables of DynamicBrokerConfig will see intermediate state of `DynamicConfig.Broker`,
// because `brokerConfigs` is created by `DynamicBrokerConfig.AllDynamicConfigs`

public static final Set<String> NON_DYNAMIC_PROPS;

static {
Set<String> props = new TreeSet<>(AbstractKafkaConfig.CONFIG_DEF.names());
props.removeAll(BROKER_CONFIGS.names());
NON_DYNAMIC_PROPS = Collections.unmodifiableSet(props);
}

public static Map<String, ConfigDef.ConfigKey> configKeys() {
return BROKER_CONFIGS.configKeys();
}

public static Set<String> names() {
return BROKER_CONFIGS.names();
}

public static Map<String, Object> validate(Properties props) {
return DynamicConfig.validate(BROKER_CONFIGS, props, true);
}
}

public static class Client {
private static final ConfigDef CONFIG_DEF = QuotaConfig.userAndClientQuotaConfigs();

public static Map<String, ConfigDef.ConfigKey> configKeys() {
return CONFIG_DEF.configKeys();
}

public static Set<String> names() {
return CONFIG_DEF.names();
}

public static Map<String, Object> validate(Properties props) {
return DynamicConfig.validate(CONFIG_DEF, props, false);
}
}

public static class User {
private static final ConfigDef USER_CONFIGS = QuotaConfig.scramMechanismsPlusUserAndClientQuotaConfigs();

public static Map<String, ConfigDef.ConfigKey> configKeys() {
return USER_CONFIGS.configKeys();
}

public static Set<String> names() {
return USER_CONFIGS.names();
}

public static Map<String, Object> validate(Properties props) {
return DynamicConfig.validate(USER_CONFIGS, props, false);
}
}

public static class Ip {
private static final ConfigDef IP_CONFIGS = QuotaConfig.ipConfigs();

public static Map<String, ConfigDef.ConfigKey> configKeys() {
return IP_CONFIGS.configKeys();
}

public static Set<String> names() {
return IP_CONFIGS.names();
}

public static Map<String, Object> validate(Properties props) {
return DynamicConfig.validate(IP_CONFIGS, props, false);
}
}

public static class ClientMetrics {
private static final ConfigDef CLIENT_CONFIGS =
org.apache.kafka.server.metrics.ClientMetricsConfigs.configDef();

public static Set<String> names() {
return CLIENT_CONFIGS.names();
}
}

public static class Group {
private static final ConfigDef GROUP_CONFIGS = GroupConfig.configDef();

public static Set<String> names() {
return GROUP_CONFIGS.names();
}
}

private static Map<String, Object> validate(ConfigDef configDef, Properties props, boolean customPropsAllowed) {
// Validate Names
Set<String> names = configDef.names();
Set<String> propKeys = new HashSet<>();
for (Object key : props.keySet()) {
propKeys.add((String) key);
}
Comment on lines +176 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this block can move into the if condition?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like:

        if (!customPropsAllowed) {
            Set<String> names = configDef.names();
            Set<String> propKeys = new HashSet<>();
            for (Object key : props.keySet()) {
                propKeys.add((String) key);
            }
            Set<String> unknownKeys = new HashSet<>(propKeys);
            unknownKeys.removeAll(names);
            if (!unknownKeys.isEmpty()) {
                throw new IllegalArgumentException("Unknown Dynamic Configuration: " + unknownKeys);
            }
        }
        Properties propResolved = resolveVariableConfigs(props);
        // Validate Values
        return configDef.parse(propResolved);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the original scala code, this code was cast.I wrote that on purpose


if (!customPropsAllowed) {
Set<String> unknownKeys = new HashSet<>(propKeys);
unknownKeys.removeAll(names);
if (!unknownKeys.isEmpty()) {
throw new IllegalArgumentException("Unknown Dynamic Configuration: " + unknownKeys);
}
}
Properties propResolved = resolveVariableConfigs(props);
// Validate Values
return configDef.parse(propResolved);
}

private static Properties resolveVariableConfigs(Properties propsOriginal) {
Properties props = new Properties();
AbstractConfig config = new AbstractConfig(new ConfigDef(), propsOriginal,
Utils.castToStringObjectMap(propsOriginal), false);
config.originals().forEach((key, value) -> {
if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
props.put(key, value);
}
});
return props;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.Sensor;

import java.util.Set;

import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Type.INT;
Expand Down Expand Up @@ -78,4 +80,7 @@ public class MetricConfigs {

// Kafka Client Telemetry Metrics Configuration
.define(CLIENT_TELEMETRY_MAX_BYTES_CONFIG, INT, CLIENT_TELEMETRY_MAX_BYTES_DEFAULT, atLeast(1), LOW, CLIENT_TELEMETRY_MAX_BYTES_DOC);

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.Collections;
import java.util.Map;
import java.util.Set;

import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
Expand Down Expand Up @@ -189,6 +190,17 @@ public final class RemoteLogManagerConfig {
public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP = "remote.list.offsets.request.timeout.ms";
public static final String REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_DOC = "The maximum amount of time the server will wait for the remote list offsets request to complete.";
public static final long DEFAULT_REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS = 30000L;

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP,
RemoteLogManagerConfig.REMOTE_FETCH_MAX_WAIT_MS_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP,
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
);

private final AbstractConfig config;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.server.config.ServerTopicConfigSynonyms;

import java.util.Set;

import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
Expand Down Expand Up @@ -86,6 +88,14 @@ public class CleanerConfig {
.define(LOG_CLEANER_MIN_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MIN_COMPACTION_LAG_MS, atLeast(0), MEDIUM, LOG_CLEANER_MIN_COMPACTION_LAG_MS_DOC)
.define(LOG_CLEANER_MAX_COMPACTION_LAG_MS_PROP, LONG, LogConfig.DEFAULT_MAX_COMPACTION_LAG_MS, atLeast(1), MEDIUM, LOG_CLEANER_MAX_COMPACTION_LAG_MS_DOC);

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
CleanerConfig.LOG_CLEANER_THREADS_PROP,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP,
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_LOAD_FACTOR_PROP,
CleanerConfig.LOG_CLEANER_IO_BUFFER_SIZE_PROP,
CleanerConfig.LOG_CLEANER_IO_MAX_BYTES_PER_SECOND_PROP,
CleanerConfig.LOG_CLEANER_BACKOFF_MS_PROP);

public final int numThreads;
public final long dedupeBufferSize;
public final double dedupeBufferLoadFactor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;

import java.util.Set;

import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
Expand Down Expand Up @@ -74,6 +76,10 @@ public final class TransactionLogConfig {
// Configuration for testing only as default value should be sufficient for typical usage
.defineInternal(PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_CONFIG, INT, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), LOW, PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DOC);

public static final Set<String> RECONFIGURABLE_CONFIGS = Set.of(
TransactionLogConfig.PRODUCER_ID_EXPIRATION_MS_CONFIG,
TransactionLogConfig.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG);

private final AbstractConfig config;
private final int transactionTopicMinISR;
private final int transactionLoadBufferSize;
Expand Down