From 99219c155dc828643b5c7e7a35b44c30bef1bdc4 Mon Sep 17 00:00:00 2001 From: wangzhuang <1459066480@qq.com> Date: Fri, 19 Jul 2024 16:30:04 +0800 Subject: [PATCH 1/5] KAFKA-4243: Ensure metric names are quoted as necessary for JMX --- .../main/java/org/apache/kafka/common/utils/Sanitizer.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java index 61d29a1a35302..fb8648566d7b0 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java @@ -82,12 +82,13 @@ public static String desanitize(String name) { } /** - * Quote `name` using {@link ObjectName#quote(String)} if `name` contains + * Replace characters `:` in Quote `name`with %3A , + * then Quote `name` using {@link ObjectName#quote(String)} if `name` contains * characters that are not safe for use in JMX. User principals that are * already sanitized using {@link #sanitize(String)} will not be quoted * since they are safe for JMX. */ public static String jmxSanitize(String name) { - return MBEAN_PATTERN.matcher(name).matches() ? name : ObjectName.quote(name); + return MBEAN_PATTERN.matcher(name).matches() ? name : ObjectName.quote(name.replaceAll(":","%3A")); } } From ec4b9180f96e664e73b3946d3af0eb66d62361bb Mon Sep 17 00:00:00 2001 From: T2233 Date: Fri, 11 Apr 2025 17:57:54 +0800 Subject: [PATCH 2/5] =?UTF-8?q?KAFKA-4243:=20Ensure=20metric=20names=20are?= =?UTF-8?q?=20quoted=20as=20necessary=20for=20JMX=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/org/apache/kafka/common/utils/Sanitizer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java index 56e39451fa227..bc3893713a080 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java @@ -27,7 +27,7 @@ * Utility class for sanitizing/desanitizing/quoting values used in JMX metric names. *

* User principals are URL-encoded using ({@link #sanitize(String)} in all metric names. - * All other metric tags including client-id are quoted if they contain special characters + * All other metric tags including client-id are quoted if they contain special characters * using {@link #jmxSanitize(String)} when registering in JMX. */ public class Sanitizer { @@ -73,6 +73,6 @@ public static String desanitize(String name) { * since they are safe for JMX. */ public static String jmxSanitize(String name) { - return MBEAN_PATTERN.matcher(name).matches() ? name : ObjectName.quote(name.replaceAll(":","%3A")); + return MBEAN_PATTERN.matcher(name).matches() ? name : ObjectName.quote(name.replaceAll(":", "%3A")); } } From e21e54866995bde98e66b802ae83cd66f3e4ce8b Mon Sep 17 00:00:00 2001 From: T2233 Date: Wed, 16 Apr 2025 15:53:41 +0800 Subject: [PATCH 3/5] KAFKA-19112:cleanup.policy shouldn't be empty --- .../apache/kafka/common/config/ConfigDef.java | 166 +++++++---- .../kafka/common/config/TopicConfig.java | 171 +++++------ .../kafka/common/config/ConfigDefTest.java | 241 ++++++++-------- .../group/GroupCoordinatorConfig.java | 272 +++++++++--------- .../kafka/server/config/KRaftConfigs.java | 10 +- .../storage/internals/log/LogConfig.java | 14 +- 6 files changed, 457 insertions(+), 417 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 970d9cebf7231..29d2b60d15bd2 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -40,13 +40,13 @@ * This class is used for specifying the set of expected configurations. For each configuration, you can specify * the name, the type, the default value, the documentation, the group information, the order in the group, * the width of the configuration value and the name suitable for display in the UI. - * + *

* You can provide special validation logic used for single configuration validation by overriding {@link Validator}. - * + *

* Moreover, you can specify the dependents of a configuration. The valid values and visibility of a configuration * may change according to the values of other configurations. You can override {@link Recommender} to get valid * values and set visibility of a configuration given the current configuration values. - * + *

*

* To use the class: *

@@ -136,6 +136,7 @@ public ConfigDef define(ConfigKey key) { /** * Define a new configuration + * * @param name the name of the config parameter * @param type the type of the config * @param defaultValue the default value to use if this config isn't present @@ -157,19 +158,20 @@ public ConfigDef define(String name, Type type, Object defaultValue, Validator v /** * Define a new configuration - * @param name the name of the config parameter - * @param type the type of the config - * @param defaultValue the default value to use if this config isn't present - * @param validator the validator to use in checking the correctness of the config - * @param importance the importance of this config - * @param documentation the documentation string for the config - * @param group the group this config belongs to - * @param orderInGroup the order of this config in the group - * @param width the width of the config - * @param displayName the name suitable for display - * @param dependents the configurations that are dependents of this configuration - * @param recommender the recommender provides valid values given the parent configuration values - * @param alternativeString the string which will be used to override the string of defaultValue + * + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param validator the validator to use in checking the correctness of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param dependents the configurations that are dependents of this configuration + * @param recommender the recommender provides valid values given the parent configuration values + * @param alternativeString the string which will be used to override the string of defaultValue * @return This ConfigDef so you can chain calls */ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, @@ -180,6 +182,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Validator v /** * Define a new configuration with no custom recommender + * * @param name the name of the config parameter * @param type the type of the config * @param defaultValue the default value to use if this config isn't present @@ -200,6 +203,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Validator v /** * Define a new configuration with no dependents + * * @param name the name of the config parameter * @param type the type of the config * @param defaultValue the default value to use if this config isn't present @@ -220,6 +224,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Validator v /** * Define a new configuration with no dependents and no custom recommender + * * @param name the name of the config parameter * @param type the type of the config * @param defaultValue the default value to use if this config isn't present @@ -239,6 +244,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Validator v /** * Define a new configuration with no special validation logic + * * @param name the name of the config parameter * @param type the type of the config * @param defaultValue the default value to use if this config isn't present @@ -259,6 +265,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Importance /** * Define a new configuration with no special validation logic and no custom recommender + * * @param name the name of the config parameter * @param type the type of the config * @param defaultValue the default value to use if this config isn't present @@ -278,6 +285,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Importance /** * Define a new configuration with no special validation logic and no custom recommender + * * @param name the name of the config parameter * @param type the type of the config * @param defaultValue the default value to use if this config isn't present @@ -297,6 +305,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Importance /** * Define a new configuration with no special validation logic, not dependents and no custom recommender + * * @param name the name of the config parameter * @param type the type of the config * @param defaultValue the default value to use if this config isn't present @@ -315,6 +324,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Importance /** * Define a new configuration with no default value and no special validation logic + * * @param name the name of the config parameter * @param type the type of the config * @param importance the importance of this config @@ -334,6 +344,7 @@ public ConfigDef define(String name, Type type, Importance importance, String do /** * Define a new configuration with no default value, no special validation logic and no custom recommender + * * @param name the name of the config parameter * @param type the type of the config * @param importance the importance of this config @@ -352,6 +363,7 @@ public ConfigDef define(String name, Type type, Importance importance, String do /** * Define a new configuration with no default value, no special validation logic and no custom recommender + * * @param name the name of the config parameter * @param type the type of the config * @param importance the importance of this config @@ -370,6 +382,7 @@ public ConfigDef define(String name, Type type, Importance importance, String do /** * Define a new configuration with no default value, no special validation logic, no dependents and no custom recommender + * * @param name the name of the config parameter * @param type the type of the config * @param importance the importance of this config @@ -387,6 +400,7 @@ public ConfigDef define(String name, Type type, Importance importance, String do /** * Define a new configuration with no group, no order in group, no width, no display name, no dependents and no custom recommender + * * @param name the name of the config parameter * @param type the type of the config * @param defaultValue the default value to use if this config isn't present @@ -401,6 +415,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Validator v /** * Define a new configuration with no special validation logic + * * @param name The name of the config parameter * @param type The type of the config * @param defaultValue The default value to use if this config isn't present @@ -414,6 +429,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Importance /** * Define a new configuration with no special validation logic + * * @param name The name of the config parameter * @param type The type of the config * @param defaultValue The default value to use if this config isn't present @@ -429,6 +445,7 @@ public ConfigDef define(String name, Type type, Object defaultValue, Importance /** * Define a new configuration with no default value and no special validation logic + * * @param name The name of the config parameter * @param type The type of the config * @param importance The importance of this config: is this something you will likely need to change. @@ -442,10 +459,11 @@ public ConfigDef define(String name, Type type, Importance importance, String do /** * Define a new internal configuration. Internal configuration won't show up in the docs and aren't * intended for general use. - * @param name The name of the config parameter - * @param type The type of the config - * @param defaultValue The default value to use if this config isn't present - * @param importance The importance of this config (i.e. is this something you will likely need to change?) + * + * @param name The name of the config parameter + * @param type The type of the config + * @param defaultValue The default value to use if this config isn't present + * @param importance The importance of this config (i.e. is this something you will likely need to change?) * @return This ConfigDef so you can chain calls */ public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Importance importance) { @@ -455,12 +473,13 @@ public ConfigDef defineInternal(final String name, final Type type, final Object /** * Define a new internal configuration. Internal configuration won't show up in the docs and aren't * intended for general use. - * @param name The name of the config parameter - * @param type The type of the config - * @param defaultValue The default value to use if this config isn't present - * @param validator The validator to use in checking the correctness of the config - * @param importance The importance of this config (i.e. is this something you will likely need to change?) - * @param documentation The documentation string for the config + * + * @param name The name of the config parameter + * @param type The type of the config + * @param defaultValue The default value to use if this config isn't present + * @param validator The validator to use in checking the correctness of the config + * @param importance The importance of this config (i.e. is this something you will likely need to change?) + * @param documentation The documentation string for the config * @return This ConfigDef so you can chain calls */ public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Validator validator, final Importance importance, final String documentation) { @@ -469,6 +488,7 @@ public ConfigDef defineInternal(final String name, final Type type, final Object /** * Get the configuration keys + * * @return a map containing all configuration keys */ public Map configKeys() { @@ -477,6 +497,7 @@ public Map configKeys() { /** * Get the groups for the configuration + * * @return a list of group names */ public List groups() { @@ -485,6 +506,7 @@ public List groups() { /** * Add standard SSL client configuration options. + * * @return this */ public ConfigDef withClientSslSupport() { @@ -494,6 +516,7 @@ public ConfigDef withClientSslSupport() { /** * Add standard SASL client configuration options. + * * @return this */ public ConfigDef withClientSaslSupport() { @@ -529,7 +552,7 @@ Object parseValue(ConfigKey key, Object value, boolean isSet) { Object parsedValue; if (isSet) { parsedValue = parseType(key.name, value, key.type); - // props map doesn't contain setting, the key is required because no default value specified - its an error + // props map doesn't contain setting, the key is required because no default value specified - its an error } else if (NO_DEFAULT_VALUE.equals(key.defaultValue)) { throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value."); } else { @@ -544,6 +567,7 @@ Object parseValue(ConfigKey key, Object value, boolean isSet) { /** * Validate the current configuration values with the configuration definition. + * * @param props the current configuration values * @return List of Config, each Config contains the updated configuration information given * the current configuration values. @@ -554,12 +578,12 @@ public List validate(Map props) { public Map validateAll(Map props) { Map configValues = new HashMap<>(); - for (String name: configKeys.keySet()) { + for (String name : configKeys.keySet()) { configValues.put(name, new ConfigValue(name)); } List undefinedConfigKeys = undefinedDependentConfigs(); - for (String undefinedConfigKey: undefinedConfigKeys) { + for (String undefinedConfigKey : undefinedConfigKeys) { ConfigValue undefinedConfigValue = new ConfigValue(undefinedConfigKey); undefinedConfigValue.addErrorMessage(undefinedConfigKey + " is referred in the dependents, but not defined."); undefinedConfigValue.visible(false); @@ -574,7 +598,7 @@ public Map validateAll(Map props) { Map parseForValidate(Map props, Map configValues) { Map parsed = new HashMap<>(); Set configsWithNoParent = getConfigsWithNoParent(); - for (String name: configsWithNoParent) { + for (String name : configsWithNoParent) { parseForValidate(name, props, parsed, configValues); } return parsed; @@ -583,7 +607,7 @@ Map parseForValidate(Map props, Map validate(Map parsed, Map configValues) { Set configsWithNoParent = getConfigsWithNoParent(); - for (String name: configsWithNoParent) { + for (String name : configsWithNoParent) { validate(name, parsed, configValues); } return configValues; @@ -592,7 +616,7 @@ private Map validate(Map parsed, Map undefinedDependentConfigs() { Set undefinedConfigKeys = new HashSet<>(); for (ConfigKey configKey : configKeys.values()) { - for (String dependent: configKey.dependents) { + for (String dependent : configKey.dependents) { if (!configKeys.containsKey(dependent)) { undefinedConfigKeys.add(dependent); } @@ -608,7 +632,7 @@ Set getConfigsWithNoParent() { } Set configsWithParent = new HashSet<>(); - for (ConfigKey configKey: configKeys.values()) { + for (ConfigKey configKey : configKeys.values()) { List dependents = configKey.dependents; configsWithParent.addAll(dependents); } @@ -648,7 +672,7 @@ private void parseForValidate(String name, Map props, Map parsed, Map convertToStringMapWithPasswordValues(Map configs) { + public static Map convertToStringMapWithPasswordValues(Map configs) { Map result = new HashMap<>(); for (Map.Entry entry : configs.entrySet()) { Object value = entry.getValue(); @@ -888,6 +913,7 @@ public enum Type { /** * Whether this type contains sensitive data such as a password or key. + * * @return true if the type is {@link #PASSWORD} */ public boolean isSensitive() { @@ -919,7 +945,8 @@ public interface Recommender { /** * The valid values for the configuration given the current configuration values. - * @param name The name of the configuration + * + * @param name The name of the configuration * @param parsedConfig The parsed configuration values * @return The list of valid values. To function properly, the returned objects should have the type * defined for the configuration using the recommender. @@ -928,7 +955,8 @@ public interface Recommender { /** * Set the visibility of the configuration given the current configuration values. - * @param name The name of the configuration + * + * @param name The name of the configuration * @param parsedConfig The parsed configuration values * @return The visibility of the configuration */ @@ -941,7 +969,8 @@ public interface Recommender { public interface Validator { /** * Perform single configuration validation. - * @param name The name of the configuration + * + * @param name The name of the configuration * @param value The value of the configuration * @throws ConfigException if the value is invalid. */ @@ -956,9 +985,10 @@ public static class Range implements Validator { private final Number max; /** - * A numeric range with inclusive upper bound and inclusive lower bound - * @param min the lower bound - * @param max the upper bound + * A numeric range with inclusive upper bound and inclusive lower bound + * + * @param min the lower bound + * @param max the upper bound */ private Range(Number min, Number max) { this.min = min; @@ -1006,26 +1036,36 @@ else if (max == null) public static class ValidList implements Validator { final ValidString validString; + final boolean isEmptyAllowed; - private ValidList(List validStrings) { + private ValidList(List validStrings, boolean isEmptyAllowed) { this.validString = new ValidString(validStrings); + this.isEmptyAllowed = isEmptyAllowed; } + @Deprecated public static ValidList in(String... validStrings) { - return new ValidList(Arrays.asList(validStrings)); + return new ValidList(Arrays.asList(validStrings), true); + } + + public static ValidList inWithEmptyCheck(boolean isEmptyAllowed, String... validStrings) { + return new ValidList(List.of(validStrings), isEmptyAllowed); } @Override public void ensureValid(final String name, final Object value) { @SuppressWarnings("unchecked") List values = (List) value; + if (!isEmptyAllowed && values.isEmpty()) { + throw new ConfigException("Configuration '" + name + "' must not be empty. Valid values include: " + validString); + } for (String string : values) { validString.ensureValid(name, string); } } public String toString() { - return validString.toString(); + return validString + (isEmptyAllowed ? "" : " (empty not allowed)"); } } @@ -1060,8 +1100,8 @@ public static class CaseInsensitiveValidString implements Validator { private CaseInsensitiveValidString(List validStrings) { this.validStrings = validStrings.stream() - .map(s -> s.toUpperCase(Locale.ROOT)) - .collect(Collectors.toSet()); + .map(s -> s.toUpperCase(Locale.ROOT)) + .collect(Collectors.toSet()); } public static CaseInsensitiveValidString in(String... validStrings) { @@ -1134,7 +1174,7 @@ public static CompositeValidator of(Validator... validators) { @Override public void ensureValid(String name, Object value) { - for (Validator validator: validators) { + for (Validator validator : validators) { validator.ensureValid(name, value); } } @@ -1143,7 +1183,7 @@ public void ensureValid(String name, Object value) { public String toString() { if (validators == null) return ""; StringBuilder desc = new StringBuilder(); - for (Validator v: validators) { + for (Validator v : validators) { if (desc.length() > 0) { desc.append(',').append(' '); } @@ -1257,14 +1297,14 @@ public ConfigKey(String name, Type type, Object defaultValue, Validator validato List dependents, Recommender recommender, boolean internalConfig) { this(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, - dependents, recommender, internalConfig, null); + dependents, recommender, internalConfig, null); } private ConfigKey(String name, Type type, Object defaultValue, Validator validator, - Importance importance, String documentation, String group, - int orderInGroup, Width width, String displayName, - List dependents, Recommender recommender, - boolean internalConfig, String alternativeString) { + Importance importance, String documentation, String group, + int orderInGroup, Width width, String displayName, + List dependents, Recommender recommender, + boolean internalConfig, String alternativeString) { this.name = name; this.type = type; boolean hasDefault = !NO_DEFAULT_VALUE.equals(defaultValue); @@ -1398,6 +1438,7 @@ private void addColumnValue(StringBuilder builder, String value) { * If dynamicUpdateModes is non-empty, a "Dynamic Update Mode" column * will be included n the table with the value of the update mode. Default * mode is "read-only". + * * @param dynamicUpdateModes Config name -> update mode mapping */ public String toHtmlTable(Map dynamicUpdateModes) { @@ -1521,13 +1562,13 @@ private void getConfigKeyRst(ConfigKey key, StringBuilder b) { /** * Get a list of configs sorted taking the 'group' and 'orderInGroup' into account. - * + *

* If grouping is not specified, the result will reflect "natural" order: listing required fields first, then ordering by importance, and finally by name. */ private List sortedConfigs() { final Map groupOrd = new HashMap<>(groups.size()); int ord = 0; - for (String group: groups) { + for (String group : groups) { groupOrd.put(group, ord++); } @@ -1538,8 +1579,8 @@ private List sortedConfigs() { private int compare(ConfigKey k1, ConfigKey k2, Map groupOrd) { int cmp = k1.group == null - ? (k2.group == null ? 0 : -1) - : (k2.group == null ? 1 : Integer.compare(groupOrd.get(k1.group), groupOrd.get(k2.group))); + ? (k2.group == null ? 0 : -1) + : (k2.group == null ? 1 : Integer.compare(groupOrd.get(k1.group), groupOrd.get(k2.group))); if (cmp == 0) { cmp = Integer.compare(k1.orderInGroup, k2.orderInGroup); if (cmp == 0) { @@ -1585,7 +1626,7 @@ public void embed(final String keyPrefix, final String groupPrefix, final int st private static Validator embeddedValidator(final String keyPrefix, final Validator base) { if (base == null) return null; return ConfigDef.LambdaValidator.with( - (name, value) -> base.ensureValid(name.substring(keyPrefix.length()), value), base::toString); + (name, value) -> base.ensureValid(name.substring(keyPrefix.length()), value), base::toString); } /** @@ -1638,6 +1679,7 @@ public String toHtml() { /** * Converts this config into an HTML list that can be embedded into docs. + * * @param headerDepth The top level header depth in the generated HTML. * @param idGenerator A function for computing the HTML id attribute in the generated HTML from a given config name. */ @@ -1650,6 +1692,7 @@ public String toHtml(int headerDepth, Function idGenerator) { * If dynamicUpdateModes is non-empty, a "Dynamic Update Mode" label * will be included in the config details with the value of the update mode. Default * mode is "read-only". + * * @param dynamicUpdateModes Config name -> update mode mapping. */ public String toHtml(Map dynamicUpdateModes) { @@ -1661,8 +1704,9 @@ public String toHtml(Map dynamicUpdateModes) { * If dynamicUpdateModes is non-empty, a "Dynamic Update Mode" label * will be included in the config details with the value of the update mode. Default * mode is "read-only". - * @param headerDepth The top level header depth in the generated HTML. - * @param idGenerator A function for computing the HTML id attribute in the generated HTML from a given config name. + * + * @param headerDepth The top level header depth in the generated HTML. + * @param idGenerator A function for computing the HTML id attribute in the generated HTML from a given config name. * @param dynamicUpdateModes Config name -> update mode mapping. */ public String toHtml(int headerDepth, Function idGenerator, diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java index f3157bb1b1a43..2d991f201c611 100755 --- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -30,54 +30,54 @@ public class TopicConfig { public static final String SEGMENT_BYTES_CONFIG = "segment.bytes"; public static final String SEGMENT_BYTES_DOC = "This configuration controls the segment file size for " + - "the log. Retention and cleaning is always done a file at a time so a larger segment size means " + - "fewer files but less granular control over retention."; + "the log. Retention and cleaning is always done a file at a time so a larger segment size means " + + "fewer files but less granular control over retention."; public static final String SEGMENT_MS_CONFIG = "segment.ms"; public static final String SEGMENT_MS_DOC = "This configuration controls the period of time after " + - "which Kafka will force the log to roll even if the segment file isn't full to ensure that retention " + - "can delete or compact old data."; + "which Kafka will force the log to roll even if the segment file isn't full to ensure that retention " + + "can delete or compact old data."; public static final String SEGMENT_JITTER_MS_CONFIG = "segment.jitter.ms"; public static final String SEGMENT_JITTER_MS_DOC = "The maximum random jitter subtracted from the scheduled " + - "segment roll time to avoid thundering herds of segment rolling"; + "segment roll time to avoid thundering herds of segment rolling"; public static final String SEGMENT_INDEX_BYTES_CONFIG = "segment.index.bytes"; public static final String SEGMENT_INDEX_BYTES_DOC = "This configuration controls the size of the index that " + - "maps offsets to file positions. We preallocate this index file and shrink it only after log " + - "rolls. You generally should not need to change this setting."; + "maps offsets to file positions. We preallocate this index file and shrink it only after log " + + "rolls. You generally should not need to change this setting."; public static final String FLUSH_MESSAGES_INTERVAL_CONFIG = "flush.messages"; public static final String FLUSH_MESSAGES_INTERVAL_DOC = "This setting allows specifying an interval at " + - "which we will force an fsync of data written to the log. For example if this was set to 1 " + - "we would fsync after every message; if it were 5 we would fsync after every five messages. " + - "In general we recommend you not set this and use replication for durability and allow the " + - "operating system's background flush capabilities as it is more efficient. This setting can " + - "be overridden on a per-topic basis (see the per-topic configuration section)."; + "which we will force an fsync of data written to the log. For example if this was set to 1 " + + "we would fsync after every message; if it were 5 we would fsync after every five messages. " + + "In general we recommend you not set this and use replication for durability and allow the " + + "operating system's background flush capabilities as it is more efficient. This setting can " + + "be overridden on a per-topic basis (see the per-topic configuration section)."; public static final String FLUSH_MS_CONFIG = "flush.ms"; public static final String FLUSH_MS_DOC = "This setting allows specifying a time interval at which we will " + - "force an fsync of data written to the log. For example if this was set to 1000 " + - "we would fsync after 1000 ms had passed. In general we recommend you not set " + - "this and use replication for durability and allow the operating system's background " + - "flush capabilities as it is more efficient."; + "force an fsync of data written to the log. For example if this was set to 1000 " + + "we would fsync after 1000 ms had passed. In general we recommend you not set " + + "this and use replication for durability and allow the operating system's background " + + "flush capabilities as it is more efficient."; public static final String RETENTION_BYTES_CONFIG = "retention.bytes"; public static final String RETENTION_BYTES_DOC = "This configuration controls the maximum size a partition " + - "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " + - "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " + - "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " + - "the topic retention in bytes. Additionally, retention.bytes configuration " + - "operates independently of \"segment.ms\" and \"segment.bytes\" configurations. " + - "Moreover, it triggers the rolling of new segment if the retention.bytes is configured to zero."; + "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " + + "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " + + "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " + + "the topic retention in bytes. Additionally, retention.bytes configuration " + + "operates independently of \"segment.ms\" and \"segment.bytes\" configurations. " + + "Moreover, it triggers the rolling of new segment if the retention.bytes is configured to zero."; public static final String RETENTION_MS_CONFIG = "retention.ms"; public static final String RETENTION_MS_DOC = "This configuration controls the maximum time we will retain a " + - "log before we will discard old log segments to free up space if we are using the " + - "\"delete\" retention policy. This represents an SLA on how soon consumers must read " + - "their data. If set to -1, no time limit is applied. Additionally, retention.ms configuration " + - "operates independently of \"segment.ms\" and \"segment.bytes\" configurations. " + - "Moreover, it triggers the rolling of new segment if the retention.ms condition is satisfied."; + "log before we will discard old log segments to free up space if we are using the " + + "\"delete\" retention policy. This represents an SLA on how soon consumers must read " + + "their data. If set to -1, no time limit is applied. Additionally, retention.ms configuration " + + "operates independently of \"segment.ms\" and \"segment.bytes\" configurations. " + + "Moreover, it triggers the rolling of new segment if the retention.ms condition is satisfied."; public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable"; public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration to true. " + @@ -107,84 +107,85 @@ public class TopicConfig { public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes"; public static final String MAX_MESSAGE_BYTES_DOC = - "The largest record batch size allowed by Kafka (after compression if compression is enabled). " + - "In the latest message format version, records are always grouped into batches for efficiency. " + - "In previous message format versions, uncompressed records are not grouped into batches and this " + - "limit only applies to a single record in that case."; + "The largest record batch size allowed by Kafka (after compression if compression is enabled). " + + "In the latest message format version, records are always grouped into batches for efficiency. " + + "In previous message format versions, uncompressed records are not grouped into batches and this " + + "limit only applies to a single record in that case."; public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes"; public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " + - "Kafka adds an index entry to its offset index. The default setting ensures that we index a " + - "message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact " + - "position in the log but makes the index larger. You probably don't need to change this."; + "Kafka adds an index entry to its offset index. The default setting ensures that we index a " + + "message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact " + + "position in the log but makes the index larger. You probably don't need to change this."; public static final String FILE_DELETE_DELAY_MS_CONFIG = "file.delete.delay.ms"; public static final String FILE_DELETE_DELAY_MS_DOC = "The time to wait before deleting a file from the " + - "filesystem"; + "filesystem"; public static final String DELETE_RETENTION_MS_CONFIG = "delete.retention.ms"; public static final String DELETE_RETENTION_MS_DOC = "The amount of time to retain delete tombstone markers " + - "for log compacted topics. This setting also gives a bound " + - "on the time in which a consumer must complete a read if they begin from offset 0 " + - "to ensure that they get a valid snapshot of the final stage (otherwise delete " + - "tombstones may be collected before they complete their scan)."; + "for log compacted topics. This setting also gives a bound " + + "on the time in which a consumer must complete a read if they begin from offset 0 " + + "to ensure that they get a valid snapshot of the final stage (otherwise delete " + + "tombstones may be collected before they complete their scan)."; public static final String MIN_COMPACTION_LAG_MS_CONFIG = "min.compaction.lag.ms"; public static final String MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain " + - "uncompacted in the log. Only applicable for logs that are being compacted."; + "uncompacted in the log. Only applicable for logs that are being compacted."; public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms"; public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain " + - "ineligible for compaction in the log. Only applicable for logs that are being compacted."; + "ineligible for compaction in the log. Only applicable for logs that are being compacted."; public static final String MIN_CLEANABLE_DIRTY_RATIO_CONFIG = "min.cleanable.dirty.ratio"; public static final String MIN_CLEANABLE_DIRTY_RATIO_DOC = "This configuration controls how frequently " + - "the log compactor will attempt to clean the log (assuming log " + - "compaction is enabled). By default we will avoid cleaning a log where more than " + - "50% of the log has been compacted. This ratio bounds the maximum space wasted in " + - "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " + - "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " + - "space in the log. If the " + MAX_COMPACTION_LAG_MS_CONFIG + " or the " + MIN_COMPACTION_LAG_MS_CONFIG + - " configurations are also specified, then the log compactor considers the log to be eligible for compaction " + - "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " + - "records for at least the " + MIN_COMPACTION_LAG_MS_CONFIG + " duration, or (ii) if the log has had " + - "dirty (uncompacted) records for at most the " + MAX_COMPACTION_LAG_MS_CONFIG + " period."; + "the log compactor will attempt to clean the log (assuming log " + + "compaction is enabled). By default we will avoid cleaning a log where more than " + + "50% of the log has been compacted. This ratio bounds the maximum space wasted in " + + "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " + + "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " + + "space in the log. If the " + MAX_COMPACTION_LAG_MS_CONFIG + " or the " + MIN_COMPACTION_LAG_MS_CONFIG + + " configurations are also specified, then the log compactor considers the log to be eligible for compaction " + + "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " + + "records for at least the " + MIN_COMPACTION_LAG_MS_CONFIG + " duration, or (ii) if the log has had " + + "dirty (uncompacted) records for at most the " + MAX_COMPACTION_LAG_MS_CONFIG + " period."; public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy"; public static final String CLEANUP_POLICY_COMPACT = "compact"; public static final String CLEANUP_POLICY_DELETE = "delete"; + public static final String CLEANUP_POLICY_NONE = "none"; public static final String CLEANUP_POLICY_DOC = "This config designates the retention policy to " + - "use on log segments. The \"delete\" policy (which is the default) will discard old segments " + - "when their retention time or size limit has been reached. The \"compact\" policy will enable " + - "log compaction, which retains the latest value for each key. " + - "It is also possible to specify both policies in a comma-separated list (e.g. \"delete,compact\"). " + - "In this case, old segments will be discarded per the retention time and size configuration, " + - "while retained segments will be compacted."; + "use on log segments. The \"delete\" policy (which is the default) will discard old segments " + + "when their retention time or size limit has been reached. The \"compact\" policy will enable " + + "log compaction, which retains the latest value for each key. " + + "It is also possible to specify both policies in a comma-separated list (e.g. \"delete,compact\"). " + + "In this case, old segments will be discarded per the retention time and size configuration, " + + "while retained segments will be compacted."; public static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = "unclean.leader.election.enable"; public static final String UNCLEAN_LEADER_ELECTION_ENABLE_DOC = "Indicates whether to enable replicas " + - "not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " + - "loss.

Note: In KRaft mode, when enabling this config dynamically, it needs to wait for the unclean leader election" + - "thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " + - "to trigger the unclean leader election immediately if needed.

"; + "not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " + + "loss.

Note: In KRaft mode, when enabling this config dynamically, it needs to wait for the unclean leader election" + + "thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " + + "to trigger the unclean leader election immediately if needed.

"; public static final String MIN_IN_SYNC_REPLICAS_CONFIG = "min.insync.replicas"; public static final String MIN_IN_SYNC_REPLICAS_DOC = "When a producer sets acks to \"all\" (or \"-1\"), " + - "this configuration specifies the minimum number of replicas that must acknowledge " + - "a write for the write to be considered successful. If this minimum cannot be met, " + - "then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
" + - "Regardless of the acks setting, the messages will not be visible to the consumers until " + - "they are replicated to all in-sync replicas and the min.insync.replicas condition is met.
" + - "When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. " + - "A typical scenario would be to create a topic with a replication factor of 3, " + - "set min.insync.replicas to 2, and produce with acks of \"all\". " + - "This will ensure that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers."; + "this configuration specifies the minimum number of replicas that must acknowledge " + + "a write for the write to be considered successful. If this minimum cannot be met, " + + "then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
" + + "Regardless of the acks setting, the messages will not be visible to the consumers until " + + "they are replicated to all in-sync replicas and the min.insync.replicas condition is met.
" + + "When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. " + + "A typical scenario would be to create a topic with a replication factor of 3, " + + "set min.insync.replicas to 2, and produce with acks of \"all\". " + + "This will ensure that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers."; public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " + - "This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally " + - "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " + - "original compression codec set by the producer."; + "This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally " + + "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " + + "original compression codec set by the producer."; public static final String COMPRESSION_GZIP_LEVEL_CONFIG = "compression.gzip.level"; @@ -196,28 +197,28 @@ public class TopicConfig { public static final String PREALLOCATE_CONFIG = "preallocate"; public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " + - "creating a new log segment."; + "creating a new log segment."; public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = "message.timestamp.type"; public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " + - "message create time or log append time."; + "message create time or log append time."; public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG = "message.timestamp.before.max.ms"; public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC = "This configuration sets the allowable timestamp " + - "difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than " + - "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " + - "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " + - "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; + "difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than " + + "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " + + "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " + + "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; public static final String MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = "message.timestamp.after.max.ms"; public static final String MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp " + - "difference between the message timestamp and the broker's timestamp. The message timestamp can be later than " + - "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " + - "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " + - "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; + "difference between the message timestamp and the broker's timestamp. The message timestamp can be later than " + + "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " + + "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " + + "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; /** * @deprecated down-conversion is not possible in Apache Kafka 4.0 and newer, hence this configuration is a no-op, - * and it is deprecated for removal in Apache Kafka 5.0. + * and it is deprecated for removal in Apache Kafka 5.0. */ @Deprecated public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable"; @@ -227,5 +228,5 @@ public class TopicConfig { */ @Deprecated public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "Down-conversion is not possible in Apache Kafka 4.0 and newer, " + - "hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0."; + "hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0."; } diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 6e1f0e232429b..a3abc2a124392 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -54,15 +54,15 @@ public class ConfigDefTest { @Test public void testBasicTypes() { ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), Importance.HIGH, "docs") - .define("b", Type.LONG, Importance.HIGH, "docs") - .define("c", Type.STRING, "hello", Importance.HIGH, "docs") - .define("d", Type.LIST, Importance.HIGH, "docs") - .define("e", Type.DOUBLE, Importance.HIGH, "docs") - .define("f", Type.CLASS, Importance.HIGH, "docs") - .define("g", Type.BOOLEAN, Importance.HIGH, "docs") - .define("h", Type.BOOLEAN, Importance.HIGH, "docs") - .define("i", Type.BOOLEAN, Importance.HIGH, "docs") - .define("j", Type.PASSWORD, Importance.HIGH, "docs"); + .define("b", Type.LONG, Importance.HIGH, "docs") + .define("c", Type.STRING, "hello", Importance.HIGH, "docs") + .define("d", Type.LIST, Importance.HIGH, "docs") + .define("e", Type.DOUBLE, Importance.HIGH, "docs") + .define("f", Type.CLASS, Importance.HIGH, "docs") + .define("g", Type.BOOLEAN, Importance.HIGH, "docs") + .define("h", Type.BOOLEAN, Importance.HIGH, "docs") + .define("i", Type.BOOLEAN, Importance.HIGH, "docs") + .define("j", Type.PASSWORD, Importance.HIGH, "docs"); Properties props = new Properties(); props.put("a", "1 "); @@ -116,7 +116,7 @@ public void testParsingEmptyDefaultValueForStringFieldShouldSucceed() { @Test public void testDefinedTwice() { assertThrows(ConfigException.class, () -> new ConfigDef().define("a", Type.STRING, - Importance.HIGH, "docs").define("a", Type.INT, Importance.HIGH, "docs")); + Importance.HIGH, "docs").define("a", Type.INT, Importance.HIGH, "docs")); } @Test @@ -147,13 +147,13 @@ private void testBadInputs(Type type, Object... values) { @Test public void testInvalidDefaultRange() { assertThrows(ConfigException.class, () -> new ConfigDef().define("name", Type.INT, -1, - Range.between(0, 10), Importance.HIGH, "docs")); + Range.between(0, 10), Importance.HIGH, "docs")); } @Test public void testInvalidDefaultString() { assertThrows(ConfigException.class, () -> new ConfigDef().define("name", Type.STRING, "bad", - ValidString.in("valid", "values"), Importance.HIGH, "docs")); + ValidString.in("valid", "values"), Importance.HIGH, "docs")); } @Test @@ -169,10 +169,11 @@ public void testValidators() { testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default", new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs", "DEFAULT", null}); testValidators(Type.STRING, CaseInsensitiveValidString.in("good", "values", "default"), "default", - new Object[]{"gOOd", "VALUES", "default"}, new Object[]{"Bad", "iNPUts", null}); - testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"}); - testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"}, new Object[] {null}); - testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[] {null, -1, "c"}); + new Object[]{"gOOd", "VALUES", "default"}, new Object[]{"Bad", "iNPUts", null}); + testValidators(Type.LIST, ConfigDef.ValidList.inWithEmptyCheck(true, "1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"}); + testValidators(Type.LIST, ConfigDef.ValidList.inWithEmptyCheck(false, "1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{""}); + testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"}, new Object[]{null}); + testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[]{null, -1, "c"}); testValidators(Type.STRING, new ConfigDef.NonEmptyStringWithoutControlChars(), "defaultname", new Object[]{"test", "name", "test/test", "test\u1234", "\u1324name\\", "/+%>&):??<&()?-", "+1", "\uD83D\uDE01", "\uF3B1", " test \n\r", "\n hello \t"}, new Object[]{"nontrailing\nnotallowed", "as\u0001cii control char", "tes\rt", "test\btest", "1\t2", ""}); @@ -203,7 +204,7 @@ public void testNullDefaultWithValidator() { ConfigDef def = new ConfigDef(); def.define(key, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, - ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs"); + ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs"); Properties props = new Properties(); props.put(key, "ONE"); @@ -215,17 +216,17 @@ public void testNullDefaultWithValidator() { public void testGroupInference() { List expected1 = Arrays.asList("group1", "group2"); ConfigDef def1 = new ConfigDef() - .define("a", Type.INT, Importance.HIGH, "docs", "group1", 1, Width.SHORT, "a") - .define("b", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "b") - .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c"); + .define("a", Type.INT, Importance.HIGH, "docs", "group1", 1, Width.SHORT, "a") + .define("b", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "b") + .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c"); assertEquals(expected1, def1.groups()); List expected2 = Arrays.asList("group2", "group1"); ConfigDef def2 = new ConfigDef() - .define("a", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "a") - .define("b", Type.INT, Importance.HIGH, "docs", "group2", 2, Width.SHORT, "b") - .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c"); + .define("a", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "a") + .define("b", Type.INT, Importance.HIGH, "docs", "group2", 2, Width.SHORT, "b") + .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c"); assertEquals(expected2, def2.groups()); } @@ -251,10 +252,10 @@ public void testParseForValidate() { expected.put("d", configD); ConfigDef def = new ConfigDef() - .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false)) - .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true)) - .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true)) - .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", singletonList("b"), new IntegerRecommender(false)); + .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false)) + .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true)) + .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true)) + .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", singletonList("b"), new IntegerRecommender(false)); Map props = new HashMap<>(); props.put("a", "1"); @@ -289,10 +290,10 @@ public void testValidate() { expected.put("d", configD); ConfigDef def = new ConfigDef() - .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false)) - .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true)) - .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true)) - .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", singletonList("b"), new IntegerRecommender(false)); + .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false)) + .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true)) + .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true)) + .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", singletonList("b"), new IntegerRecommender(false)); Map props = new HashMap<>(); props.put("a", "1"); @@ -325,15 +326,15 @@ public void testValidateMissingConfigKey() { expected.put("d", configD); ConfigDef def = new ConfigDef() - .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c", "d"), new IntegerRecommender(false)) - .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true)) - .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true)); + .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c", "d"), new IntegerRecommender(false)) + .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true)) + .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true)); Map props = new HashMap<>(); props.put("a", "1"); List configs = def.validate(props); - for (ConfigValue config: configs) { + for (ConfigValue config : configs) { String name = config.name(); ConfigValue expectedConfig = expected.get(name); assertEquals(expectedConfig, config); @@ -352,7 +353,7 @@ public void testValidateCannotParse() { props.put("a", "non_integer"); List configs = def.validate(props); - for (ConfigValue config: configs) { + for (ConfigValue config : configs) { String name = config.name(); ConfigValue expectedConfig = expected.get(name); assertEquals(expectedConfig, config); @@ -505,32 +506,32 @@ public void toRst() { final String expectedRst = "``opt2``\n" + - " docs2\n" + - "\n" + - " * Type: int\n" + - " * Importance: medium\n" + - "\n" + - "``opt1``\n" + - " docs1\n" + - "\n" + - " * Type: string\n" + - " * Default: a\n" + - " * Valid Values: [a, b, c]\n" + - " * Importance: high\n" + - "\n" + - "``opt3``\n" + - " docs3\n" + - "\n" + - " * Type: list\n" + - " * Default: a,b\n" + - " * Importance: low\n" + - "\n" + - "``opt4``\n" + - "\n" + - " * Type: boolean\n" + - " * Default: false\n" + - " * Importance: low\n" + - "\n"; + " docs2\n" + + "\n" + + " * Type: int\n" + + " * Importance: medium\n" + + "\n" + + "``opt1``\n" + + " docs1\n" + + "\n" + + " * Type: string\n" + + " * Default: a\n" + + " * Valid Values: [a, b, c]\n" + + " * Importance: high\n" + + "\n" + + "``opt3``\n" + + " docs3\n" + + "\n" + + " * Type: list\n" + + " * Default: a,b\n" + + " * Importance: low\n" + + "\n" + + "``opt4``\n" + + "\n" + + " * Type: boolean\n" + + " * Default: false\n" + + " * Importance: low\n" + + "\n"; assertEquals(expectedRst, def.toRst()); } @@ -550,48 +551,48 @@ public void toEnrichedRst() { final String expectedRst = "``poor.opt``\n" + - " Doc doc doc doc.\n" + - "\n" + - " * Type: string\n" + - " * Default: foo\n" + - " * Importance: high\n" + - "\n" + - "Group One\n" + - "^^^^^^^^^\n" + - "\n" + - "``opt1.of.group1``\n" + - " Doc doc.\n" + - "\n" + - " * Type: string\n" + - " * Default: a\n" + - " * Valid Values: [a, b, c]\n" + - " * Importance: high\n" + - "\n" + - "``opt2.of.group1``\n" + - " Doc doc doc.\n" + - "\n" + - " * Type: int\n" + - " * Importance: medium\n" + - " * Dependents: ``some.option1``, ``some.option2``\n" + - "\n" + - "Group Two\n" + - "^^^^^^^^^\n" + - "\n" + - "``opt1.of.group2``\n" + - " Doc doc doc doc doc.\n" + - "\n" + - " * Type: boolean\n" + - " * Default: false\n" + - " * Importance: high\n" + - " * Dependents: ``some.option``\n" + - "\n" + - "``opt2.of.group2``\n" + - " Doc doc doc doc.\n" + - "\n" + - " * Type: boolean\n" + - " * Default: false\n" + - " * Importance: high\n" + - "\n"; + " Doc doc doc doc.\n" + + "\n" + + " * Type: string\n" + + " * Default: foo\n" + + " * Importance: high\n" + + "\n" + + "Group One\n" + + "^^^^^^^^^\n" + + "\n" + + "``opt1.of.group1``\n" + + " Doc doc.\n" + + "\n" + + " * Type: string\n" + + " * Default: a\n" + + " * Valid Values: [a, b, c]\n" + + " * Importance: high\n" + + "\n" + + "``opt2.of.group1``\n" + + " Doc doc doc.\n" + + "\n" + + " * Type: int\n" + + " * Importance: medium\n" + + " * Dependents: ``some.option1``, ``some.option2``\n" + + "\n" + + "Group Two\n" + + "^^^^^^^^^\n" + + "\n" + + "``opt1.of.group2``\n" + + " Doc doc doc doc doc.\n" + + "\n" + + " * Type: boolean\n" + + " * Default: false\n" + + " * Importance: high\n" + + " * Dependents: ``some.option``\n" + + "\n" + + "``opt2.of.group2``\n" + + " Doc doc doc doc.\n" + + "\n" + + " * Type: boolean\n" + + " * Default: false\n" + + " * Importance: high\n" + + "\n"; assertEquals(expectedRst, def.toEnrichedRst()); } @@ -729,34 +730,34 @@ public void testNiceTimeUnits() { @Test public void testThrowsExceptionWhenListSizeExceedsLimit() { final ConfigException exception = assertThrows(ConfigException.class, () -> new ConfigDef().define("lst", - Type.LIST, - asList("a", "b"), - ListSize.atMostOfSize(1), - Importance.HIGH, - "lst doc")); + Type.LIST, + asList("a", "b"), + ListSize.atMostOfSize(1), + Importance.HIGH, + "lst doc")); assertEquals("Invalid value [a, b] for configuration lst: exceeds maximum list size of [1].", - exception.getMessage()); + exception.getMessage()); } @Test public void testNoExceptionIsThrownWhenListSizeEqualsTheLimit() { final List lst = asList("a", "b", "c"); assertDoesNotThrow(() -> new ConfigDef().define("lst", - Type.LIST, - lst, - ListSize.atMostOfSize(lst.size()), - Importance.HIGH, - "lst doc")); + Type.LIST, + lst, + ListSize.atMostOfSize(lst.size()), + Importance.HIGH, + "lst doc")); } @Test public void testNoExceptionIsThrownWhenListSizeIsBelowTheLimit() { assertDoesNotThrow(() -> new ConfigDef().define("lst", - Type.LIST, - asList("a", "b"), - ListSize.atMostOfSize(3), - Importance.HIGH, - "lst doc")); + Type.LIST, + asList("a", "b"), + ListSize.atMostOfSize(3), + Importance.HIGH, + "lst doc")); } @Test diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index ece60a36dbaf9..db245a812a005 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -55,20 +55,18 @@ * Using local variable is advantageous as it avoids the overhead of repeatedly looking up these configurations in AbstractConfig. */ public class GroupCoordinatorConfig { - /// /// Group coordinator configs - /// public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols"; public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols." + "The " + Group.GroupType.SHARE + " and " + Group.GroupType.STREAMS + " rebalance protocols are in early access and therefore must not be used in production."; public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of( - Group.GroupType.CLASSIC.toString(), - Group.GroupType.CONSUMER.toString()); + Group.GroupType.CLASSIC.toString(), + Group.GroupType.CONSUMER.toString()); public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms"; public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " + - "wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " + - "but also increases the response latency for requests, as the coordinator must wait for batches to be flushed to " + - "disk before completing request processing. Transactional writes are not accumulated."; + "wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " + + "but also increases the response latency for requests, as the coordinator must wait for batches to be flushed to " + + "disk before completing request processing. Transactional writes are not accumulated."; public static final int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 5; public static final String GROUP_COORDINATOR_NUM_THREADS_CONFIG = "group.coordinator.threads"; @@ -78,12 +76,12 @@ public class GroupCoordinatorConfig { public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = "offsets.load.buffer.size"; public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024; public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the offsets segments when loading group metadata " + - " into the cache (soft-limit, overridden if records are too large)."; + " into the cache (soft-limit, overridden if records are too large)."; public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offsets.commit.timeout.ms"; public static final int OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000; public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + - "or this timeout is reached. This is similar to the producer request timeout. This is applied to all the writes made by the coordinator."; + "or this timeout is reached. This is similar to the producer request timeout. This is applied to all the writes made by the coordinator."; public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = "offsets.topic.num.partitions"; public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50; @@ -92,20 +90,18 @@ public class GroupCoordinatorConfig { public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = "offsets.topic.segment.bytes"; public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024; public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets topic segment bytes should be kept relatively small in order to facilitate " + - "faster log compaction and cache loads."; + "faster log compaction and cache loads."; public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor"; public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3; public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the offsets topic (set higher to ensure availability). " + - "Internal topic creation will fail until the cluster size meets this replication factor requirement."; + "Internal topic creation will fail until the cluster size meets this replication factor requirement."; public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = "offsets.topic.compression.codec"; public static final CompressionType OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE; public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits."; - /// /// Offset configs - /// public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes"; public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096; public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit."; @@ -113,41 +109,37 @@ public class GroupCoordinatorConfig { public static final String OFFSETS_RETENTION_MINUTES_CONFIG = "offsets.retention.minutes"; public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60; public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed consumers, committed offset of a specific partition will be expired and discarded when " + - "1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " + - "2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " + - "For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " + - "Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; " + - "also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period."; + "1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " + + "2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " + + "For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " + + "Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; " + + "also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period."; public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = "offsets.retention.check.interval.ms"; public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 600000L; public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = "Frequency at which to check for stale offsets"; - /// /// Classic group configs - /// public static final String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.min.session.timeout.ms"; public static final String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for registered consumers. Shorter timeouts result in " + - "quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources."; + "quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources."; public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000; public static final String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.max.session.timeout.ms"; public static final String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers " + - "more time to process messages in between heartbeats at the cost of a longer time to detect failures."; + "more time to process messages in between heartbeats at the cost of a longer time to detect failures."; public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 1800000; public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.initial.rebalance.delay.ms"; public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more consumers to join a new group " + - "before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins."; + "before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins."; public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000; public static final String GROUP_MAX_SIZE_CONFIG = "group.max.size"; public static final String GROUP_MAX_SIZE_DOC = "The maximum number of consumers that a single consumer group can accommodate."; public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; - /// /// Consumer group configs - /// public static final String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.consumer.session.timeout.ms"; public static final String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the consumer group protocol."; public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000; @@ -174,38 +166,36 @@ public class GroupCoordinatorConfig { public static final String CONSUMER_GROUP_MAX_SIZE_CONFIG = "group.consumer.max.size"; public static final String CONSUMER_GROUP_MAX_SIZE_DOC = "The maximum number of consumers " + - "that a single consumer group can accommodate. This value will only impact groups under " + - "the CONSUMER group protocol. To configure the max group size when using the CLASSIC " + - "group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead."; + "that a single consumer group can accommodate. This value will only impact groups under " + + "the CONSUMER group protocol. To configure the max group size when using the CLASSIC " + + "group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead."; public static final int CONSUMER_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE; private static final List CONSUMER_GROUP_BUILTIN_ASSIGNORS = List.of( - new UniformAssignor(), - new RangeAssignor() + new UniformAssignor(), + new RangeAssignor() ); public static final String CONSUMER_GROUP_ASSIGNORS_CONFIG = "group.consumer.assignors"; public static final String CONSUMER_GROUP_ASSIGNORS_DOC = "The server side assignors as a list of either names for builtin assignors or full class names for customer assignors. " + - "The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor. " + - "The supported builtin assignors are: " + CONSUMER_GROUP_BUILTIN_ASSIGNORS.stream().map(ConsumerGroupPartitionAssignor::name).collect(Collectors.joining(", ")) + "."; + "The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor. " + + "The supported builtin assignors are: " + CONSUMER_GROUP_BUILTIN_ASSIGNORS.stream().map(ConsumerGroupPartitionAssignor::name).collect(Collectors.joining(", ")) + "."; public static final List CONSUMER_GROUP_ASSIGNORS_DEFAULT = CONSUMER_GROUP_BUILTIN_ASSIGNORS - .stream() - .map(ConsumerGroupPartitionAssignor::name) - .toList(); + .stream() + .map(ConsumerGroupPartitionAssignor::name) + .toList(); public static final String CONSUMER_GROUP_MIGRATION_POLICY_CONFIG = "group.consumer.migration.policy"; public static final String CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT = ConsumerGroupMigrationPolicy.BIDIRECTIONAL.toString(); public static final String CONSUMER_GROUP_MIGRATION_POLICY_DOC = "The config that enables converting the non-empty classic group using the consumer embedded protocol " + - "to the non-empty consumer group using the consumer group protocol and vice versa; " + - "conversions of empty groups in both directions are always enabled regardless of this policy. " + - ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled, " + - ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from classic group to consumer group is enabled, " + - ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " + - ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled."; - - /// + "to the non-empty consumer group using the consumer group protocol and vice versa; " + + "conversions of empty groups in both directions are always enabled regardless of this policy. " + + ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled, " + + ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from classic group to consumer group is enabled, " + + ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " + + ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled."; + /// Share group configs - /// public static final String SHARE_GROUP_MAX_SIZE_CONFIG = "group.share.max.size"; public static final int SHARE_GROUP_MAX_SIZE_DEFAULT = 200; public static final String SHARE_GROUP_MAX_SIZE_DOC = "The maximum number of members that a single share group can accommodate."; @@ -234,9 +224,7 @@ public class GroupCoordinatorConfig { public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000; public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members."; - /// /// Streams group configs - /// public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms"; public static final int STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000; public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the streams group protocol."; @@ -274,59 +262,59 @@ public class GroupCoordinatorConfig { public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG; public static final ConfigDef CONFIG_DEF = new ConfigDef() - // Group coordinator configs - .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, - ConfigDef.ValidList.in(Group.GroupType.documentValidValues()), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC) - .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, GROUP_COORDINATOR_NUM_THREADS_DOC) - .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) - .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC) - .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, OFFSETS_LOAD_BUFFER_SIZE_DOC) - .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_REPLICATION_FACTOR_DOC) - .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_PARTITIONS_DOC) - .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_SEGMENT_BYTES_DOC) - .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC) - - // Offset configs - .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC) - .define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC) - .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC) - - // Classic group configs - .define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC) - .define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC) - .define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, GROUP_INITIAL_REBALANCE_DELAY_MS_DOC) - .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC) - - // Consumer group configs - .define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC) - .define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) - .define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) - .define(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC) - .define(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) - .define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) - .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC) - .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC) - .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC) - - // Share group configs - .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC) - .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) - .define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) - .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC) - .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) - .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) - .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC) - - // Streams group configs - .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC) - .define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) - .define(STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) - .define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC) - .define(STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) - .define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) - .define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC) - .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC) - .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC); + // Group coordinator configs + .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT, + ConfigDef.ValidList.inWithEmptyCheck(false, Group.GroupType.documentValidValues()), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC) + .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, GROUP_COORDINATOR_NUM_THREADS_DOC) + .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC) + .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC) + .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, OFFSETS_LOAD_BUFFER_SIZE_DOC) + .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_REPLICATION_FACTOR_DOC) + .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_PARTITIONS_DOC) + .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_SEGMENT_BYTES_DOC) + .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC) + + // Offset configs + .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC) + .define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC) + .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC) + + // Classic group configs + .define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC) + .define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC) + .define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, GROUP_INITIAL_REBALANCE_DELAY_MS_DOC) + .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC) + + // Consumer group configs + .define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC) + .define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) + .define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) + .define(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC) + .define(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) + .define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) + .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC) + .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC) + .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC) + + // Share group configs + .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC) + .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) + .define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) + .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC) + .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) + .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) + .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC) + + // Streams group configs + .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC) + .define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC) + .define(STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC) + .define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC) + .define(STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC) + .define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) + .define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC) + .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC) + .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC); /** @@ -443,68 +431,68 @@ public GroupCoordinatorConfig(AbstractConfig config) { String.format("%s must be less than %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG)); // Share group configs validation. require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs, - String.format("%s must be greater than or equal to %s", - SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + String.format("%s must be greater than or equal to %s", + SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs, - String.format("%s must be greater than or equal to %s", - SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + String.format("%s must be greater than or equal to %s", + SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs, - String.format("%s must be less than or equal to %s", - SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)); + String.format("%s must be less than or equal to %s", + SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)); require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs, - String.format("%s must be greater than or equal to %s", - SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + String.format("%s must be greater than or equal to %s", + SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs, - String.format("%s must be greater than or equal to %s", - SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + String.format("%s must be greater than or equal to %s", + SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs, - String.format("%s must be less than or equal to %s", - SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); + String.format("%s must be less than or equal to %s", + SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs, - String.format("%s must be less than %s", - SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)); + String.format("%s must be less than %s", + SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG)); // Streams group configs validation. require(streamsGroupMaxHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs, - String.format("%s must be greater than or equal to %s", - STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + String.format("%s must be greater than or equal to %s", + STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); require(streamsGroupHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs, - String.format("%s must be greater than or equal to %s", - STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); + String.format("%s must be greater than or equal to %s", + STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG)); require(streamsGroupHeartbeatIntervalMs <= streamsGroupMaxHeartbeatIntervalMs, - String.format("%s must be less than or equal to %s", - STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)); + String.format("%s must be less than or equal to %s", + STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG)); require(streamsGroupMaxSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs, - String.format("%s must be greater than or equal to %s", STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + String.format("%s must be greater than or equal to %s", STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); require(streamsGroupSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs, - String.format("%s must be greater than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); + String.format("%s must be greater than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG)); require(streamsGroupSessionTimeoutMs <= streamsGroupMaxSessionTimeoutMs, - String.format("%s must be less than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); + String.format("%s must be less than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG)); require(streamsGroupNumStandbyReplicas <= streamsGroupMaxStandbyReplicas, - String.format("%s must be less than or equal to %s", STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG)); + String.format("%s must be less than or equal to %s", STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG)); require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs, - String.format("%s must be less than %s", - STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)); + String.format("%s must be less than %s", + STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG)); } public static GroupCoordinatorConfig fromProps( - Map props + Map props ) { return new GroupCoordinatorConfig( - new AbstractConfig( - GroupCoordinatorConfig.CONFIG_DEF, - props - ) + new AbstractConfig( + GroupCoordinatorConfig.CONFIG_DEF, + props + ) ); } protected List consumerGroupAssignors( - AbstractConfig config + AbstractConfig config ) { Map defaultAssignors = CONSUMER_GROUP_BUILTIN_ASSIGNORS - .stream() - .collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity())); + .stream() + .collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity())); List assignors = new ArrayList<>(); @@ -564,8 +552,8 @@ public Map extractGroupConfigMap(ShareGroupConfig shareGroupCon */ public Map extractConsumerGroupConfigMap() { return Map.of( - GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(), - GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs()); + GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(), + GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs()); } /** @@ -670,17 +658,17 @@ public long offsetsRetentionCheckIntervalMs() { /** * For subscribed consumers, committed offset of a specific partition will be expired and discarded when: - * 1) This retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); - * 2) This retention period has elapsed since the last time an offset is committed for the partition AND - * the group is no longer subscribed to the corresponding topic. - * + * 1) This retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); + * 2) This retention period has elapsed since the last time an offset is committed for the partition AND + * the group is no longer subscribed to the corresponding topic. + *

* For standalone consumers (using manual assignment), offsets will be expired after this retention period has * elapsed since the time of last commit. - * + *

* Note that when a group is deleted via the DeleteGroups request, its committed offsets will also be deleted immediately; - * + *

* Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's - * committed offsets for that topic will also be deleted without extra retention period. + * committed offsets for that topic will also be deleted without extra retention period. */ public long offsetsRetentionMs() { return offsetsRetentionMs; diff --git a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java index 4d82172c7d87b..6ffc76b636538 100644 --- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java @@ -33,10 +33,12 @@ import static org.apache.kafka.common.config.ConfigDef.Type.STRING; public class KRaftConfigs { - /** KRaft mode configs */ + /** + * KRaft mode configs + */ public static final String PROCESS_ROLES_CONFIG = "process.roles"; public static final String PROCESS_ROLES_DOC = "The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. "; - + public static final String INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG = "initial.broker.registration.timeout.ms"; public static final int INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT = 60000; public static final String INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC = "When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process."; @@ -118,10 +120,10 @@ public class KRaftConfigs { public static final long CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DEFAULT = 2000; public static final String CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC = "We will log an error message about controller events that take longer than this threshold."; - public static final ConfigDef CONFIG_DEF = new ConfigDef() + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC) .define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC) - .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC) + .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.inWithEmptyCheck(false, "broker", "controller"), HIGH, PROCESS_ROLES_DOC) .define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE, atLeast(0), HIGH, NODE_ID_DOC) .define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC) .define(BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM, BROKER_HEARTBEAT_INTERVAL_MS_DOC) diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java index 21c92cd84dff4..433f5424c1fab 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java @@ -167,7 +167,7 @@ public Optional serverConfigName(String configName) { .define(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, LONG, ServerLogConfigs.LOG_RETENTION_BYTES_DEFAULT, HIGH, ServerLogConfigs.LOG_RETENTION_BYTES_DOC) .define(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, LONG, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DOC) - .define(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ConfigDef.ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT, TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, ServerLogConfigs.LOG_CLEANUP_POLICY_DOC) + .define(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ConfigDef.ValidList.inWithEmptyCheck(false, TopicConfig.CLEANUP_POLICY_COMPACT, TopicConfig.CLEANUP_POLICY_DELETE, TopicConfig.CLEANUP_POLICY_NONE), MEDIUM, ServerLogConfigs.LOG_CLEANUP_POLICY_DOC) .define(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, atLeast(4), MEDIUM, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DOC) .define(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DOC) .define(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, LONG, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DOC) @@ -189,6 +189,7 @@ public Optional serverConfigName(String configName) { .defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC); private static final LogConfigDef CONFIG = new LogConfigDef(); + static { CONFIG. define(TopicConfig.SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), MEDIUM, @@ -221,7 +222,7 @@ public Optional serverConfigName(String configName) { TopicConfig.FILE_DELETE_DELAY_MS_DOC) .define(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, DOUBLE, DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM, TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_DOC) - .define(TopicConfig.CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT, + .define(TopicConfig.CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ValidList.inWithEmptyCheck(false, TopicConfig.CLEANUP_POLICY_COMPACT, TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, TopicConfig.CLEANUP_POLICY_DOC) .define(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, BOOLEAN, DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE, MEDIUM, TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_DOC) @@ -468,6 +469,7 @@ public static void validateNames(Properties props) { * Validates the values of the given properties. Can be called by both client and server. * The `props` supplied should contain all the LogConfig properties and the default values are extracted from the * LogConfig class. + * * @param props The properties to be validated */ public static void validateValues(Map props) { @@ -484,6 +486,7 @@ public static void validateValues(Map props) { * Validates the values of the given properties. Should be called only by the broker. * The `props` supplied doesn't contain any topic-level configs, only broker-level configs. * The default values should be extracted from the KafkaConfig. + * * @param props The properties to be validated */ public static void validateBrokerLogConfigValues(Map props, @@ -499,9 +502,10 @@ public static void validateBrokerLogConfigValues(Map props, * Validates the values of the given properties. Should be called only by the broker. * The `newConfigs` supplied contains the topic-level configs, * The default values should be extracted from the KafkaConfig. - * @param existingConfigs The existing properties - * @param newConfigs The new properties to be validated - * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled + * + * @param existingConfigs The existing properties + * @param newConfigs The new properties to be validated + * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled */ private static void validateTopicLogConfigValues(Map existingConfigs, Map newConfigs, From d09e799022a5dd3897161f425c6d6697614ef4f0 Mon Sep 17 00:00:00 2001 From: T2233 Date: Wed, 16 Apr 2025 15:56:15 +0800 Subject: [PATCH 4/5] =?UTF-8?q?Revert=20"KAFKA-4243:=20Ensure=20metric=20n?= =?UTF-8?q?ames=20are=20quoted=20as=20necessary=20for=20JMX=E3=80=82"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit ec4b9180f96e664e73b3946d3af0eb66d62361bb. --- .../main/java/org/apache/kafka/common/utils/Sanitizer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java index bc3893713a080..56e39451fa227 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java @@ -27,7 +27,7 @@ * Utility class for sanitizing/desanitizing/quoting values used in JMX metric names. *

* User principals are URL-encoded using ({@link #sanitize(String)} in all metric names. - * All other metric tags including client-id are quoted if they contain special characters + * All other metric tags including client-id are quoted if they contain special characters * using {@link #jmxSanitize(String)} when registering in JMX. */ public class Sanitizer { @@ -73,6 +73,6 @@ public static String desanitize(String name) { * since they are safe for JMX. */ public static String jmxSanitize(String name) { - return MBEAN_PATTERN.matcher(name).matches() ? name : ObjectName.quote(name.replaceAll(":", "%3A")); + return MBEAN_PATTERN.matcher(name).matches() ? name : ObjectName.quote(name.replaceAll(":","%3A")); } } From 54b872c3ed43fc339e5820e010f693940c6f765b Mon Sep 17 00:00:00 2001 From: T2233 Date: Wed, 16 Apr 2025 15:56:16 +0800 Subject: [PATCH 5/5] Revert "KAFKA-4243: Ensure metric names are quoted as necessary for JMX" This reverts commit 99219c155dc828643b5c7e7a35b44c30bef1bdc4. --- .../main/java/org/apache/kafka/common/utils/Sanitizer.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java index 56e39451fa227..669b9c3776314 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java @@ -66,13 +66,12 @@ public static String desanitize(String name) { } /** - * Replace characters `:` in Quote `name`with %3A , - * then Quote `name` using {@link ObjectName#quote(String)} if `name` contains + * Quote `name` using {@link ObjectName#quote(String)} if `name` contains * characters that are not safe for use in JMX. User principals that are * already sanitized using {@link #sanitize(String)} will not be quoted * since they are safe for JMX. */ public static String jmxSanitize(String name) { - return MBEAN_PATTERN.matcher(name).matches() ? name : ObjectName.quote(name.replaceAll(":","%3A")); + return MBEAN_PATTERN.matcher(name).matches() ? name : ObjectName.quote(name); } }