dynamicUpdateModes) {
@@ -1521,13 +1562,13 @@ private void getConfigKeyRst(ConfigKey key, StringBuilder b) {
/**
* Get a list of configs sorted taking the 'group' and 'orderInGroup' into account.
- *
+ *
* If grouping is not specified, the result will reflect "natural" order: listing required fields first, then ordering by importance, and finally by name.
*/
private List sortedConfigs() {
final Map groupOrd = new HashMap<>(groups.size());
int ord = 0;
- for (String group: groups) {
+ for (String group : groups) {
groupOrd.put(group, ord++);
}
@@ -1538,8 +1579,8 @@ private List sortedConfigs() {
private int compare(ConfigKey k1, ConfigKey k2, Map groupOrd) {
int cmp = k1.group == null
- ? (k2.group == null ? 0 : -1)
- : (k2.group == null ? 1 : Integer.compare(groupOrd.get(k1.group), groupOrd.get(k2.group)));
+ ? (k2.group == null ? 0 : -1)
+ : (k2.group == null ? 1 : Integer.compare(groupOrd.get(k1.group), groupOrd.get(k2.group)));
if (cmp == 0) {
cmp = Integer.compare(k1.orderInGroup, k2.orderInGroup);
if (cmp == 0) {
@@ -1585,7 +1626,7 @@ public void embed(final String keyPrefix, final String groupPrefix, final int st
private static Validator embeddedValidator(final String keyPrefix, final Validator base) {
if (base == null) return null;
return ConfigDef.LambdaValidator.with(
- (name, value) -> base.ensureValid(name.substring(keyPrefix.length()), value), base::toString);
+ (name, value) -> base.ensureValid(name.substring(keyPrefix.length()), value), base::toString);
}
/**
@@ -1638,6 +1679,7 @@ public String toHtml() {
/**
* Converts this config into an HTML list that can be embedded into docs.
+ *
* @param headerDepth The top level header depth in the generated HTML.
* @param idGenerator A function for computing the HTML id attribute in the generated HTML from a given config name.
*/
@@ -1650,6 +1692,7 @@ public String toHtml(int headerDepth, Function idGenerator) {
* If dynamicUpdateModes
is non-empty, a "Dynamic Update Mode" label
* will be included in the config details with the value of the update mode. Default
* mode is "read-only".
+ *
* @param dynamicUpdateModes Config name -> update mode mapping.
*/
public String toHtml(Map dynamicUpdateModes) {
@@ -1661,8 +1704,9 @@ public String toHtml(Map dynamicUpdateModes) {
* If dynamicUpdateModes
is non-empty, a "Dynamic Update Mode" label
* will be included in the config details with the value of the update mode. Default
* mode is "read-only".
- * @param headerDepth The top level header depth in the generated HTML.
- * @param idGenerator A function for computing the HTML id attribute in the generated HTML from a given config name.
+ *
+ * @param headerDepth The top level header depth in the generated HTML.
+ * @param idGenerator A function for computing the HTML id attribute in the generated HTML from a given config name.
* @param dynamicUpdateModes Config name -> update mode mapping.
*/
public String toHtml(int headerDepth, Function idGenerator,
diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
index f3157bb1b1a43..2d991f201c611 100755
--- a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java
@@ -30,54 +30,54 @@
public class TopicConfig {
public static final String SEGMENT_BYTES_CONFIG = "segment.bytes";
public static final String SEGMENT_BYTES_DOC = "This configuration controls the segment file size for " +
- "the log. Retention and cleaning is always done a file at a time so a larger segment size means " +
- "fewer files but less granular control over retention.";
+ "the log. Retention and cleaning is always done a file at a time so a larger segment size means " +
+ "fewer files but less granular control over retention.";
public static final String SEGMENT_MS_CONFIG = "segment.ms";
public static final String SEGMENT_MS_DOC = "This configuration controls the period of time after " +
- "which Kafka will force the log to roll even if the segment file isn't full to ensure that retention " +
- "can delete or compact old data.";
+ "which Kafka will force the log to roll even if the segment file isn't full to ensure that retention " +
+ "can delete or compact old data.";
public static final String SEGMENT_JITTER_MS_CONFIG = "segment.jitter.ms";
public static final String SEGMENT_JITTER_MS_DOC = "The maximum random jitter subtracted from the scheduled " +
- "segment roll time to avoid thundering herds of segment rolling";
+ "segment roll time to avoid thundering herds of segment rolling";
public static final String SEGMENT_INDEX_BYTES_CONFIG = "segment.index.bytes";
public static final String SEGMENT_INDEX_BYTES_DOC = "This configuration controls the size of the index that " +
- "maps offsets to file positions. We preallocate this index file and shrink it only after log " +
- "rolls. You generally should not need to change this setting.";
+ "maps offsets to file positions. We preallocate this index file and shrink it only after log " +
+ "rolls. You generally should not need to change this setting.";
public static final String FLUSH_MESSAGES_INTERVAL_CONFIG = "flush.messages";
public static final String FLUSH_MESSAGES_INTERVAL_DOC = "This setting allows specifying an interval at " +
- "which we will force an fsync of data written to the log. For example if this was set to 1 " +
- "we would fsync after every message; if it were 5 we would fsync after every five messages. " +
- "In general we recommend you not set this and use replication for durability and allow the " +
- "operating system's background flush capabilities as it is more efficient. This setting can " +
- "be overridden on a per-topic basis (see the per-topic configuration section).";
+ "which we will force an fsync of data written to the log. For example if this was set to 1 " +
+ "we would fsync after every message; if it were 5 we would fsync after every five messages. " +
+ "In general we recommend you not set this and use replication for durability and allow the " +
+ "operating system's background flush capabilities as it is more efficient. This setting can " +
+ "be overridden on a per-topic basis (see the per-topic configuration section).";
public static final String FLUSH_MS_CONFIG = "flush.ms";
public static final String FLUSH_MS_DOC = "This setting allows specifying a time interval at which we will " +
- "force an fsync of data written to the log. For example if this was set to 1000 " +
- "we would fsync after 1000 ms had passed. In general we recommend you not set " +
- "this and use replication for durability and allow the operating system's background " +
- "flush capabilities as it is more efficient.";
+ "force an fsync of data written to the log. For example if this was set to 1000 " +
+ "we would fsync after 1000 ms had passed. In general we recommend you not set " +
+ "this and use replication for durability and allow the operating system's background " +
+ "flush capabilities as it is more efficient.";
public static final String RETENTION_BYTES_CONFIG = "retention.bytes";
public static final String RETENTION_BYTES_DOC = "This configuration controls the maximum size a partition " +
- "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " +
- "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " +
- "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " +
- "the topic retention in bytes. Additionally, retention.bytes configuration " +
- "operates independently of \"segment.ms\" and \"segment.bytes\" configurations. " +
- "Moreover, it triggers the rolling of new segment if the retention.bytes is configured to zero.";
+ "(which consists of log segments) can grow to before we will discard old log segments to free up space if we " +
+ "are using the \"delete\" retention policy. By default there is no size limit only a time limit. " +
+ "Since this limit is enforced at the partition level, multiply it by the number of partitions to compute " +
+ "the topic retention in bytes. Additionally, retention.bytes configuration " +
+ "operates independently of \"segment.ms\" and \"segment.bytes\" configurations. " +
+ "Moreover, it triggers the rolling of new segment if the retention.bytes is configured to zero.";
public static final String RETENTION_MS_CONFIG = "retention.ms";
public static final String RETENTION_MS_DOC = "This configuration controls the maximum time we will retain a " +
- "log before we will discard old log segments to free up space if we are using the " +
- "\"delete\" retention policy. This represents an SLA on how soon consumers must read " +
- "their data. If set to -1, no time limit is applied. Additionally, retention.ms configuration " +
- "operates independently of \"segment.ms\" and \"segment.bytes\" configurations. " +
- "Moreover, it triggers the rolling of new segment if the retention.ms condition is satisfied.";
+ "log before we will discard old log segments to free up space if we are using the " +
+ "\"delete\" retention policy. This represents an SLA on how soon consumers must read " +
+ "their data. If set to -1, no time limit is applied. Additionally, retention.ms configuration " +
+ "operates independently of \"segment.ms\" and \"segment.bytes\" configurations. " +
+ "Moreover, it triggers the rolling of new segment if the retention.ms condition is satisfied.";
public static final String REMOTE_LOG_STORAGE_ENABLE_CONFIG = "remote.storage.enable";
public static final String REMOTE_LOG_STORAGE_ENABLE_DOC = "To enable tiered storage for a topic, set this configuration to true. " +
@@ -107,84 +107,85 @@ public class TopicConfig {
public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes";
public static final String MAX_MESSAGE_BYTES_DOC =
- "The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
- "In the latest message format version, records are always grouped into batches for efficiency. " +
- "In previous message format versions, uncompressed records are not grouped into batches and this " +
- "limit only applies to a single record in that case.";
+ "The largest record batch size allowed by Kafka (after compression if compression is enabled). " +
+ "In the latest message format version, records are always grouped into batches for efficiency. " +
+ "In previous message format versions, uncompressed records are not grouped into batches and this " +
+ "limit only applies to a single record in that case.";
public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes";
public static final String INDEX_INTERVAL_BYTES_DOC = "This setting controls how frequently " +
- "Kafka adds an index entry to its offset index. The default setting ensures that we index a " +
- "message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact " +
- "position in the log but makes the index larger. You probably don't need to change this.";
+ "Kafka adds an index entry to its offset index. The default setting ensures that we index a " +
+ "message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact " +
+ "position in the log but makes the index larger. You probably don't need to change this.";
public static final String FILE_DELETE_DELAY_MS_CONFIG = "file.delete.delay.ms";
public static final String FILE_DELETE_DELAY_MS_DOC = "The time to wait before deleting a file from the " +
- "filesystem";
+ "filesystem";
public static final String DELETE_RETENTION_MS_CONFIG = "delete.retention.ms";
public static final String DELETE_RETENTION_MS_DOC = "The amount of time to retain delete tombstone markers " +
- "for log compacted topics. This setting also gives a bound " +
- "on the time in which a consumer must complete a read if they begin from offset 0 " +
- "to ensure that they get a valid snapshot of the final stage (otherwise delete " +
- "tombstones may be collected before they complete their scan).";
+ "for log compacted topics. This setting also gives a bound " +
+ "on the time in which a consumer must complete a read if they begin from offset 0 " +
+ "to ensure that they get a valid snapshot of the final stage (otherwise delete " +
+ "tombstones may be collected before they complete their scan).";
public static final String MIN_COMPACTION_LAG_MS_CONFIG = "min.compaction.lag.ms";
public static final String MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain " +
- "uncompacted in the log. Only applicable for logs that are being compacted.";
+ "uncompacted in the log. Only applicable for logs that are being compacted.";
public static final String MAX_COMPACTION_LAG_MS_CONFIG = "max.compaction.lag.ms";
public static final String MAX_COMPACTION_LAG_MS_DOC = "The maximum time a message will remain " +
- "ineligible for compaction in the log. Only applicable for logs that are being compacted.";
+ "ineligible for compaction in the log. Only applicable for logs that are being compacted.";
public static final String MIN_CLEANABLE_DIRTY_RATIO_CONFIG = "min.cleanable.dirty.ratio";
public static final String MIN_CLEANABLE_DIRTY_RATIO_DOC = "This configuration controls how frequently " +
- "the log compactor will attempt to clean the log (assuming log " +
- "compaction is enabled). By default we will avoid cleaning a log where more than " +
- "50% of the log has been compacted. This ratio bounds the maximum space wasted in " +
- "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " +
- "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +
- "space in the log. If the " + MAX_COMPACTION_LAG_MS_CONFIG + " or the " + MIN_COMPACTION_LAG_MS_CONFIG +
- " configurations are also specified, then the log compactor considers the log to be eligible for compaction " +
- "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " +
- "records for at least the " + MIN_COMPACTION_LAG_MS_CONFIG + " duration, or (ii) if the log has had " +
- "dirty (uncompacted) records for at most the " + MAX_COMPACTION_LAG_MS_CONFIG + " period.";
+ "the log compactor will attempt to clean the log (assuming log " +
+ "compaction is enabled). By default we will avoid cleaning a log where more than " +
+ "50% of the log has been compacted. This ratio bounds the maximum space wasted in " +
+ "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " +
+ "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " +
+ "space in the log. If the " + MAX_COMPACTION_LAG_MS_CONFIG + " or the " + MIN_COMPACTION_LAG_MS_CONFIG +
+ " configurations are also specified, then the log compactor considers the log to be eligible for compaction " +
+ "as soon as either: (i) the dirty ratio threshold has been met and the log has had dirty (uncompacted) " +
+ "records for at least the " + MIN_COMPACTION_LAG_MS_CONFIG + " duration, or (ii) if the log has had " +
+ "dirty (uncompacted) records for at most the " + MAX_COMPACTION_LAG_MS_CONFIG + " period.";
public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy";
public static final String CLEANUP_POLICY_COMPACT = "compact";
public static final String CLEANUP_POLICY_DELETE = "delete";
+ public static final String CLEANUP_POLICY_NONE = "none";
public static final String CLEANUP_POLICY_DOC = "This config designates the retention policy to " +
- "use on log segments. The \"delete\" policy (which is the default) will discard old segments " +
- "when their retention time or size limit has been reached. The \"compact\" policy will enable " +
- "log compaction, which retains the latest value for each key. " +
- "It is also possible to specify both policies in a comma-separated list (e.g. \"delete,compact\"). " +
- "In this case, old segments will be discarded per the retention time and size configuration, " +
- "while retained segments will be compacted.";
+ "use on log segments. The \"delete\" policy (which is the default) will discard old segments " +
+ "when their retention time or size limit has been reached. The \"compact\" policy will enable " +
+ "log compaction, which retains the latest value for each key. " +
+ "It is also possible to specify both policies in a comma-separated list (e.g. \"delete,compact\"). " +
+ "In this case, old segments will be discarded per the retention time and size configuration, " +
+ "while retained segments will be compacted.";
public static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = "unclean.leader.election.enable";
public static final String UNCLEAN_LEADER_ELECTION_ENABLE_DOC = "Indicates whether to enable replicas " +
- "not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " +
- "loss.Note: In KRaft mode, when enabling this config dynamically, it needs to wait for the unclean leader election" +
- "thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " +
- "to trigger the unclean leader election immediately if needed.
";
+ "not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " +
+ "loss.Note: In KRaft mode, when enabling this config dynamically, it needs to wait for the unclean leader election" +
+ "thread to trigger election periodically (default is 5 minutes). Please run `kafka-leader-election.sh` with `unclean` option " +
+ "to trigger the unclean leader election immediately if needed.
";
public static final String MIN_IN_SYNC_REPLICAS_CONFIG = "min.insync.replicas";
public static final String MIN_IN_SYNC_REPLICAS_DOC = "When a producer sets acks to \"all\" (or \"-1\"), " +
- "this configuration specifies the minimum number of replicas that must acknowledge " +
- "a write for the write to be considered successful. If this minimum cannot be met, " +
- "then the producer will raise an exception (either NotEnoughReplicas
or NotEnoughReplicasAfterAppend
).
" +
- "Regardless of the acks
setting, the messages will not be visible to the consumers until " +
- "they are replicated to all in-sync replicas and the min.insync.replicas
condition is met.
" +
- "When used together, min.insync.replicas
and acks
allow you to enforce greater durability guarantees. " +
- "A typical scenario would be to create a topic with a replication factor of 3, " +
- "set min.insync.replicas
to 2, and produce with acks
of \"all\". " +
- "This will ensure that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers.";
+ "this configuration specifies the minimum number of replicas that must acknowledge " +
+ "a write for the write to be considered successful. If this minimum cannot be met, " +
+ "then the producer will raise an exception (either NotEnoughReplicas
or NotEnoughReplicasAfterAppend
).
" +
+ "Regardless of the acks
setting, the messages will not be visible to the consumers until " +
+ "they are replicated to all in-sync replicas and the min.insync.replicas
condition is met.
" +
+ "When used together, min.insync.replicas
and acks
allow you to enforce greater durability guarantees. " +
+ "A typical scenario would be to create a topic with a replication factor of 3, " +
+ "set min.insync.replicas
to 2, and produce with acks
of \"all\". " +
+ "This will ensure that a majority of replicas must persist a write before it's considered successful by the producer and it's visible to consumers.";
public static final String COMPRESSION_TYPE_CONFIG = "compression.type";
public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " +
- "This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally " +
- "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " +
- "original compression codec set by the producer.";
+ "This configuration accepts the standard compression codecs ('gzip', 'snappy', 'lz4', 'zstd'). It additionally " +
+ "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " +
+ "original compression codec set by the producer.";
public static final String COMPRESSION_GZIP_LEVEL_CONFIG = "compression.gzip.level";
@@ -196,28 +197,28 @@ public class TopicConfig {
public static final String PREALLOCATE_CONFIG = "preallocate";
public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
- "creating a new log segment.";
+ "creating a new log segment.";
public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = "message.timestamp.type";
public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " +
- "message create time or log append time.";
+ "message create time or log append time.";
public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG = "message.timestamp.before.max.ms";
public static final String MESSAGE_TIMESTAMP_BEFORE_MAX_MS_DOC = "This configuration sets the allowable timestamp " +
- "difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than " +
- "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " +
- "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " +
- "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
+ "difference between the broker's timestamp and the message timestamp. The message timestamp can be earlier than " +
+ "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " +
+ "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " +
+ "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
public static final String MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG = "message.timestamp.after.max.ms";
public static final String MESSAGE_TIMESTAMP_AFTER_MAX_MS_DOC = "This configuration sets the allowable timestamp " +
- "difference between the message timestamp and the broker's timestamp. The message timestamp can be later than " +
- "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " +
- "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " +
- "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
+ "difference between the message timestamp and the broker's timestamp. The message timestamp can be later than " +
+ "or equal to the broker's timestamp, with the maximum allowable difference determined by the value set in this " +
+ "configuration. If message.timestamp.type=CreateTime, the message will be rejected if the difference in " +
+ "timestamps exceeds this specified threshold. This configuration is ignored if message.timestamp.type=LogAppendTime.";
/**
* @deprecated down-conversion is not possible in Apache Kafka 4.0 and newer, hence this configuration is a no-op,
- * and it is deprecated for removal in Apache Kafka 5.0.
+ * and it is deprecated for removal in Apache Kafka 5.0.
*/
@Deprecated
public static final String MESSAGE_DOWNCONVERSION_ENABLE_CONFIG = "message.downconversion.enable";
@@ -227,5 +228,5 @@ public class TopicConfig {
*/
@Deprecated
public static final String MESSAGE_DOWNCONVERSION_ENABLE_DOC = "Down-conversion is not possible in Apache Kafka 4.0 and newer, " +
- "hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0.";
+ "hence this configuration is no-op and it is deprecated for removal in Apache Kafka 5.0.";
}
diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
index 6e1f0e232429b..a3abc2a124392 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java
@@ -54,15 +54,15 @@ public class ConfigDefTest {
@Test
public void testBasicTypes() {
ConfigDef def = new ConfigDef().define("a", Type.INT, 5, Range.between(0, 14), Importance.HIGH, "docs")
- .define("b", Type.LONG, Importance.HIGH, "docs")
- .define("c", Type.STRING, "hello", Importance.HIGH, "docs")
- .define("d", Type.LIST, Importance.HIGH, "docs")
- .define("e", Type.DOUBLE, Importance.HIGH, "docs")
- .define("f", Type.CLASS, Importance.HIGH, "docs")
- .define("g", Type.BOOLEAN, Importance.HIGH, "docs")
- .define("h", Type.BOOLEAN, Importance.HIGH, "docs")
- .define("i", Type.BOOLEAN, Importance.HIGH, "docs")
- .define("j", Type.PASSWORD, Importance.HIGH, "docs");
+ .define("b", Type.LONG, Importance.HIGH, "docs")
+ .define("c", Type.STRING, "hello", Importance.HIGH, "docs")
+ .define("d", Type.LIST, Importance.HIGH, "docs")
+ .define("e", Type.DOUBLE, Importance.HIGH, "docs")
+ .define("f", Type.CLASS, Importance.HIGH, "docs")
+ .define("g", Type.BOOLEAN, Importance.HIGH, "docs")
+ .define("h", Type.BOOLEAN, Importance.HIGH, "docs")
+ .define("i", Type.BOOLEAN, Importance.HIGH, "docs")
+ .define("j", Type.PASSWORD, Importance.HIGH, "docs");
Properties props = new Properties();
props.put("a", "1 ");
@@ -116,7 +116,7 @@ public void testParsingEmptyDefaultValueForStringFieldShouldSucceed() {
@Test
public void testDefinedTwice() {
assertThrows(ConfigException.class, () -> new ConfigDef().define("a", Type.STRING,
- Importance.HIGH, "docs").define("a", Type.INT, Importance.HIGH, "docs"));
+ Importance.HIGH, "docs").define("a", Type.INT, Importance.HIGH, "docs"));
}
@Test
@@ -147,13 +147,13 @@ private void testBadInputs(Type type, Object... values) {
@Test
public void testInvalidDefaultRange() {
assertThrows(ConfigException.class, () -> new ConfigDef().define("name", Type.INT, -1,
- Range.between(0, 10), Importance.HIGH, "docs"));
+ Range.between(0, 10), Importance.HIGH, "docs"));
}
@Test
public void testInvalidDefaultString() {
assertThrows(ConfigException.class, () -> new ConfigDef().define("name", Type.STRING, "bad",
- ValidString.in("valid", "values"), Importance.HIGH, "docs"));
+ ValidString.in("valid", "values"), Importance.HIGH, "docs"));
}
@Test
@@ -169,10 +169,11 @@ public void testValidators() {
testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default",
new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs", "DEFAULT", null});
testValidators(Type.STRING, CaseInsensitiveValidString.in("good", "values", "default"), "default",
- new Object[]{"gOOd", "VALUES", "default"}, new Object[]{"Bad", "iNPUts", null});
- testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"});
- testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"}, new Object[] {null});
- testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[] {null, -1, "c"});
+ new Object[]{"gOOd", "VALUES", "default"}, new Object[]{"Bad", "iNPUts", null});
+ testValidators(Type.LIST, ConfigDef.ValidList.inWithEmptyCheck(true, "1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"});
+ testValidators(Type.LIST, ConfigDef.ValidList.inWithEmptyCheck(false, "1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{""});
+ testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"}, new Object[]{null});
+ testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[]{null, -1, "c"});
testValidators(Type.STRING, new ConfigDef.NonEmptyStringWithoutControlChars(), "defaultname",
new Object[]{"test", "name", "test/test", "test\u1234", "\u1324name\\", "/+%>&):??<&()?-", "+1", "\uD83D\uDE01", "\uF3B1", " test \n\r", "\n hello \t"},
new Object[]{"nontrailing\nnotallowed", "as\u0001cii control char", "tes\rt", "test\btest", "1\t2", ""});
@@ -203,7 +204,7 @@ public void testNullDefaultWithValidator() {
ConfigDef def = new ConfigDef();
def.define(key, Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
- ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs");
+ ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs");
Properties props = new Properties();
props.put(key, "ONE");
@@ -215,17 +216,17 @@ public void testNullDefaultWithValidator() {
public void testGroupInference() {
List expected1 = Arrays.asList("group1", "group2");
ConfigDef def1 = new ConfigDef()
- .define("a", Type.INT, Importance.HIGH, "docs", "group1", 1, Width.SHORT, "a")
- .define("b", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "b")
- .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c");
+ .define("a", Type.INT, Importance.HIGH, "docs", "group1", 1, Width.SHORT, "a")
+ .define("b", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "b")
+ .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c");
assertEquals(expected1, def1.groups());
List expected2 = Arrays.asList("group2", "group1");
ConfigDef def2 = new ConfigDef()
- .define("a", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "a")
- .define("b", Type.INT, Importance.HIGH, "docs", "group2", 2, Width.SHORT, "b")
- .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c");
+ .define("a", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "a")
+ .define("b", Type.INT, Importance.HIGH, "docs", "group2", 2, Width.SHORT, "b")
+ .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c");
assertEquals(expected2, def2.groups());
}
@@ -251,10 +252,10 @@ public void testParseForValidate() {
expected.put("d", configD);
ConfigDef def = new ConfigDef()
- .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false))
- .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
- .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true))
- .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", singletonList("b"), new IntegerRecommender(false));
+ .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false))
+ .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
+ .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true))
+ .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", singletonList("b"), new IntegerRecommender(false));
Map props = new HashMap<>();
props.put("a", "1");
@@ -289,10 +290,10 @@ public void testValidate() {
expected.put("d", configD);
ConfigDef def = new ConfigDef()
- .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false))
- .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
- .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true))
- .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", singletonList("b"), new IntegerRecommender(false));
+ .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false))
+ .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
+ .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true))
+ .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", singletonList("b"), new IntegerRecommender(false));
Map props = new HashMap<>();
props.put("a", "1");
@@ -325,15 +326,15 @@ public void testValidateMissingConfigKey() {
expected.put("d", configD);
ConfigDef def = new ConfigDef()
- .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c", "d"), new IntegerRecommender(false))
- .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
- .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true));
+ .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c", "d"), new IntegerRecommender(false))
+ .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true))
+ .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true));
Map props = new HashMap<>();
props.put("a", "1");
List configs = def.validate(props);
- for (ConfigValue config: configs) {
+ for (ConfigValue config : configs) {
String name = config.name();
ConfigValue expectedConfig = expected.get(name);
assertEquals(expectedConfig, config);
@@ -352,7 +353,7 @@ public void testValidateCannotParse() {
props.put("a", "non_integer");
List configs = def.validate(props);
- for (ConfigValue config: configs) {
+ for (ConfigValue config : configs) {
String name = config.name();
ConfigValue expectedConfig = expected.get(name);
assertEquals(expectedConfig, config);
@@ -505,32 +506,32 @@ public void toRst() {
final String expectedRst =
"``opt2``\n" +
- " docs2\n" +
- "\n" +
- " * Type: int\n" +
- " * Importance: medium\n" +
- "\n" +
- "``opt1``\n" +
- " docs1\n" +
- "\n" +
- " * Type: string\n" +
- " * Default: a\n" +
- " * Valid Values: [a, b, c]\n" +
- " * Importance: high\n" +
- "\n" +
- "``opt3``\n" +
- " docs3\n" +
- "\n" +
- " * Type: list\n" +
- " * Default: a,b\n" +
- " * Importance: low\n" +
- "\n" +
- "``opt4``\n" +
- "\n" +
- " * Type: boolean\n" +
- " * Default: false\n" +
- " * Importance: low\n" +
- "\n";
+ " docs2\n" +
+ "\n" +
+ " * Type: int\n" +
+ " * Importance: medium\n" +
+ "\n" +
+ "``opt1``\n" +
+ " docs1\n" +
+ "\n" +
+ " * Type: string\n" +
+ " * Default: a\n" +
+ " * Valid Values: [a, b, c]\n" +
+ " * Importance: high\n" +
+ "\n" +
+ "``opt3``\n" +
+ " docs3\n" +
+ "\n" +
+ " * Type: list\n" +
+ " * Default: a,b\n" +
+ " * Importance: low\n" +
+ "\n" +
+ "``opt4``\n" +
+ "\n" +
+ " * Type: boolean\n" +
+ " * Default: false\n" +
+ " * Importance: low\n" +
+ "\n";
assertEquals(expectedRst, def.toRst());
}
@@ -550,48 +551,48 @@ public void toEnrichedRst() {
final String expectedRst =
"``poor.opt``\n" +
- " Doc doc doc doc.\n" +
- "\n" +
- " * Type: string\n" +
- " * Default: foo\n" +
- " * Importance: high\n" +
- "\n" +
- "Group One\n" +
- "^^^^^^^^^\n" +
- "\n" +
- "``opt1.of.group1``\n" +
- " Doc doc.\n" +
- "\n" +
- " * Type: string\n" +
- " * Default: a\n" +
- " * Valid Values: [a, b, c]\n" +
- " * Importance: high\n" +
- "\n" +
- "``opt2.of.group1``\n" +
- " Doc doc doc.\n" +
- "\n" +
- " * Type: int\n" +
- " * Importance: medium\n" +
- " * Dependents: ``some.option1``, ``some.option2``\n" +
- "\n" +
- "Group Two\n" +
- "^^^^^^^^^\n" +
- "\n" +
- "``opt1.of.group2``\n" +
- " Doc doc doc doc doc.\n" +
- "\n" +
- " * Type: boolean\n" +
- " * Default: false\n" +
- " * Importance: high\n" +
- " * Dependents: ``some.option``\n" +
- "\n" +
- "``opt2.of.group2``\n" +
- " Doc doc doc doc.\n" +
- "\n" +
- " * Type: boolean\n" +
- " * Default: false\n" +
- " * Importance: high\n" +
- "\n";
+ " Doc doc doc doc.\n" +
+ "\n" +
+ " * Type: string\n" +
+ " * Default: foo\n" +
+ " * Importance: high\n" +
+ "\n" +
+ "Group One\n" +
+ "^^^^^^^^^\n" +
+ "\n" +
+ "``opt1.of.group1``\n" +
+ " Doc doc.\n" +
+ "\n" +
+ " * Type: string\n" +
+ " * Default: a\n" +
+ " * Valid Values: [a, b, c]\n" +
+ " * Importance: high\n" +
+ "\n" +
+ "``opt2.of.group1``\n" +
+ " Doc doc doc.\n" +
+ "\n" +
+ " * Type: int\n" +
+ " * Importance: medium\n" +
+ " * Dependents: ``some.option1``, ``some.option2``\n" +
+ "\n" +
+ "Group Two\n" +
+ "^^^^^^^^^\n" +
+ "\n" +
+ "``opt1.of.group2``\n" +
+ " Doc doc doc doc doc.\n" +
+ "\n" +
+ " * Type: boolean\n" +
+ " * Default: false\n" +
+ " * Importance: high\n" +
+ " * Dependents: ``some.option``\n" +
+ "\n" +
+ "``opt2.of.group2``\n" +
+ " Doc doc doc doc.\n" +
+ "\n" +
+ " * Type: boolean\n" +
+ " * Default: false\n" +
+ " * Importance: high\n" +
+ "\n";
assertEquals(expectedRst, def.toEnrichedRst());
}
@@ -729,34 +730,34 @@ public void testNiceTimeUnits() {
@Test
public void testThrowsExceptionWhenListSizeExceedsLimit() {
final ConfigException exception = assertThrows(ConfigException.class, () -> new ConfigDef().define("lst",
- Type.LIST,
- asList("a", "b"),
- ListSize.atMostOfSize(1),
- Importance.HIGH,
- "lst doc"));
+ Type.LIST,
+ asList("a", "b"),
+ ListSize.atMostOfSize(1),
+ Importance.HIGH,
+ "lst doc"));
assertEquals("Invalid value [a, b] for configuration lst: exceeds maximum list size of [1].",
- exception.getMessage());
+ exception.getMessage());
}
@Test
public void testNoExceptionIsThrownWhenListSizeEqualsTheLimit() {
final List lst = asList("a", "b", "c");
assertDoesNotThrow(() -> new ConfigDef().define("lst",
- Type.LIST,
- lst,
- ListSize.atMostOfSize(lst.size()),
- Importance.HIGH,
- "lst doc"));
+ Type.LIST,
+ lst,
+ ListSize.atMostOfSize(lst.size()),
+ Importance.HIGH,
+ "lst doc"));
}
@Test
public void testNoExceptionIsThrownWhenListSizeIsBelowTheLimit() {
assertDoesNotThrow(() -> new ConfigDef().define("lst",
- Type.LIST,
- asList("a", "b"),
- ListSize.atMostOfSize(3),
- Importance.HIGH,
- "lst doc"));
+ Type.LIST,
+ asList("a", "b"),
+ ListSize.atMostOfSize(3),
+ Importance.HIGH,
+ "lst doc"));
}
@Test
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
index ece60a36dbaf9..db245a812a005 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java
@@ -55,20 +55,18 @@
* Using local variable is advantageous as it avoids the overhead of repeatedly looking up these configurations in AbstractConfig.
*/
public class GroupCoordinatorConfig {
- ///
/// Group coordinator configs
- ///
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG = "group.coordinator.rebalance.protocols";
public static final String GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC = "The list of enabled rebalance protocols." +
"The " + Group.GroupType.SHARE + " and " + Group.GroupType.STREAMS + " rebalance protocols are in early access and therefore must not be used in production.";
public static final List GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT = List.of(
- Group.GroupType.CLASSIC.toString(),
- Group.GroupType.CONSUMER.toString());
+ Group.GroupType.CLASSIC.toString(),
+ Group.GroupType.CONSUMER.toString());
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG = "group.coordinator.append.linger.ms";
public static final String GROUP_COORDINATOR_APPEND_LINGER_MS_DOC = "The duration in milliseconds that the coordinator will " +
- "wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " +
- "but also increases the response latency for requests, as the coordinator must wait for batches to be flushed to " +
- "disk before completing request processing. Transactional writes are not accumulated.";
+ "wait for writes to accumulate before flushing them to disk. Increasing this value improves write efficiency and batch size, " +
+ "but also increases the response latency for requests, as the coordinator must wait for batches to be flushed to " +
+ "disk before completing request processing. Transactional writes are not accumulated.";
public static final int GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT = 5;
public static final String GROUP_COORDINATOR_NUM_THREADS_CONFIG = "group.coordinator.threads";
@@ -78,12 +76,12 @@ public class GroupCoordinatorConfig {
public static final String OFFSETS_LOAD_BUFFER_SIZE_CONFIG = "offsets.load.buffer.size";
public static final int OFFSETS_LOAD_BUFFER_SIZE_DEFAULT = 5 * 1024 * 1024;
public static final String OFFSETS_LOAD_BUFFER_SIZE_DOC = "Batch size for reading from the offsets segments when loading group metadata " +
- " into the cache (soft-limit, overridden if records are too large).";
+ " into the cache (soft-limit, overridden if records are too large).";
public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offsets.commit.timeout.ms";
public static final int OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000;
public static final String OFFSET_COMMIT_TIMEOUT_MS_DOC = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " +
- "or this timeout is reached. This is similar to the producer request timeout. This is applied to all the writes made by the coordinator.";
+ "or this timeout is reached. This is similar to the producer request timeout. This is applied to all the writes made by the coordinator.";
public static final String OFFSETS_TOPIC_PARTITIONS_CONFIG = "offsets.topic.num.partitions";
public static final int OFFSETS_TOPIC_PARTITIONS_DEFAULT = 50;
@@ -92,20 +90,18 @@ public class GroupCoordinatorConfig {
public static final String OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG = "offsets.topic.segment.bytes";
public static final int OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT = 100 * 1024 * 1024;
public static final String OFFSETS_TOPIC_SEGMENT_BYTES_DOC = "The offsets topic segment bytes should be kept relatively small in order to facilitate " +
- "faster log compaction and cache loads.";
+ "faster log compaction and cache loads.";
public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor";
public static final short OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT = 3;
public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_DOC = "The replication factor for the offsets topic (set higher to ensure availability). " +
- "Internal topic creation will fail until the cluster size meets this replication factor requirement.";
+ "Internal topic creation will fail until the cluster size meets this replication factor requirement.";
public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG = "offsets.topic.compression.codec";
public static final CompressionType OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT = CompressionType.NONE;
public static final String OFFSETS_TOPIC_COMPRESSION_CODEC_DOC = "Compression codec for the offsets topic - compression may be used to achieve \"atomic\" commits.";
- ///
/// Offset configs
- ///
public static final String OFFSET_METADATA_MAX_SIZE_CONFIG = "offset.metadata.max.bytes";
public static final int OFFSET_METADATA_MAX_SIZE_DEFAULT = 4096;
public static final String OFFSET_METADATA_MAX_SIZE_DOC = "The maximum size for a metadata entry associated with an offset commit.";
@@ -113,41 +109,37 @@ public class GroupCoordinatorConfig {
public static final String OFFSETS_RETENTION_MINUTES_CONFIG = "offsets.retention.minutes";
public static final int OFFSETS_RETENTION_MINUTES_DEFAULT = 7 * 24 * 60;
public static final String OFFSETS_RETENTION_MINUTES_DOC = "For subscribed consumers, committed offset of a specific partition will be expired and discarded when " +
- "1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " +
- "2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " +
- "For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " +
- "Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; " +
- "also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period.";
+ "1) this retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty); " +
+ "2) this retention period has elapsed since the last time an offset is committed for the partition and the group is no longer subscribed to the corresponding topic. " +
+ "For standalone consumers (using manual assignment), offsets will be expired after this retention period has elapsed since the time of last commit. " +
+ "Note that when a group is deleted via the delete-group request, its committed offsets will also be deleted without extra retention period; " +
+ "also when a topic is deleted via the delete-topic request, upon propagated metadata update any group's committed offsets for that topic will also be deleted without extra retention period.";
public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG = "offsets.retention.check.interval.ms";
public static final long OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT = 600000L;
public static final String OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC = "Frequency at which to check for stale offsets";
- ///
/// Classic group configs
- ///
public static final String GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG = "group.min.session.timeout.ms";
public static final String GROUP_MIN_SESSION_TIMEOUT_MS_DOC = "The minimum allowed session timeout for registered consumers. Shorter timeouts result in " +
- "quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources.";
+ "quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources.";
public static final int GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT = 6000;
public static final String GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG = "group.max.session.timeout.ms";
public static final String GROUP_MAX_SESSION_TIMEOUT_MS_DOC = "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers " +
- "more time to process messages in between heartbeats at the cost of a longer time to detect failures.";
+ "more time to process messages in between heartbeats at the cost of a longer time to detect failures.";
public static final int GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT = 1800000;
public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.initial.rebalance.delay.ms";
public static final String GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more consumers to join a new group " +
- "before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins.";
+ "before performing the first rebalance. A longer delay means potentially fewer rebalances, but increases the time until processing begins.";
public static final int GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
public static final String GROUP_MAX_SIZE_CONFIG = "group.max.size";
public static final String GROUP_MAX_SIZE_DOC = "The maximum number of consumers that a single consumer group can accommodate.";
public static final int GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
- ///
/// Consumer group configs
- ///
public static final String CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.consumer.session.timeout.ms";
public static final String CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the consumer group protocol.";
public static final int CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
@@ -174,38 +166,36 @@ public class GroupCoordinatorConfig {
public static final String CONSUMER_GROUP_MAX_SIZE_CONFIG = "group.consumer.max.size";
public static final String CONSUMER_GROUP_MAX_SIZE_DOC = "The maximum number of consumers " +
- "that a single consumer group can accommodate. This value will only impact groups under " +
- "the CONSUMER group protocol. To configure the max group size when using the CLASSIC " +
- "group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead.";
+ "that a single consumer group can accommodate. This value will only impact groups under " +
+ "the CONSUMER group protocol. To configure the max group size when using the CLASSIC " +
+ "group protocol use " + GROUP_MAX_SIZE_CONFIG + " " + "instead.";
public static final int CONSUMER_GROUP_MAX_SIZE_DEFAULT = Integer.MAX_VALUE;
private static final List CONSUMER_GROUP_BUILTIN_ASSIGNORS = List.of(
- new UniformAssignor(),
- new RangeAssignor()
+ new UniformAssignor(),
+ new RangeAssignor()
);
public static final String CONSUMER_GROUP_ASSIGNORS_CONFIG = "group.consumer.assignors";
public static final String CONSUMER_GROUP_ASSIGNORS_DOC = "The server side assignors as a list of either names for builtin assignors or full class names for customer assignors. " +
- "The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor. " +
- "The supported builtin assignors are: " + CONSUMER_GROUP_BUILTIN_ASSIGNORS.stream().map(ConsumerGroupPartitionAssignor::name).collect(Collectors.joining(", ")) + ".";
+ "The first one in the list is considered as the default assignor to be used in the case where the consumer does not specify an assignor. " +
+ "The supported builtin assignors are: " + CONSUMER_GROUP_BUILTIN_ASSIGNORS.stream().map(ConsumerGroupPartitionAssignor::name).collect(Collectors.joining(", ")) + ".";
public static final List CONSUMER_GROUP_ASSIGNORS_DEFAULT = CONSUMER_GROUP_BUILTIN_ASSIGNORS
- .stream()
- .map(ConsumerGroupPartitionAssignor::name)
- .toList();
+ .stream()
+ .map(ConsumerGroupPartitionAssignor::name)
+ .toList();
public static final String CONSUMER_GROUP_MIGRATION_POLICY_CONFIG = "group.consumer.migration.policy";
public static final String CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT = ConsumerGroupMigrationPolicy.BIDIRECTIONAL.toString();
public static final String CONSUMER_GROUP_MIGRATION_POLICY_DOC = "The config that enables converting the non-empty classic group using the consumer embedded protocol " +
- "to the non-empty consumer group using the consumer group protocol and vice versa; " +
- "conversions of empty groups in both directions are always enabled regardless of this policy. " +
- ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled, " +
- ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from classic group to consumer group is enabled, " +
- ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " +
- ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled.";
-
- ///
+ "to the non-empty consumer group using the consumer group protocol and vice versa; " +
+ "conversions of empty groups in both directions are always enabled regardless of this policy. " +
+ ConsumerGroupMigrationPolicy.BIDIRECTIONAL + ": both upgrade from classic group to consumer group and downgrade from consumer group to classic group are enabled, " +
+ ConsumerGroupMigrationPolicy.UPGRADE + ": only upgrade from classic group to consumer group is enabled, " +
+ ConsumerGroupMigrationPolicy.DOWNGRADE + ": only downgrade from consumer group to classic group is enabled, " +
+ ConsumerGroupMigrationPolicy.DISABLED + ": neither upgrade nor downgrade is enabled.";
+
/// Share group configs
- ///
public static final String SHARE_GROUP_MAX_SIZE_CONFIG = "group.share.max.size";
public static final int SHARE_GROUP_MAX_SIZE_DEFAULT = 200;
public static final String SHARE_GROUP_MAX_SIZE_DOC = "The maximum number of members that a single share group can accommodate.";
@@ -234,9 +224,7 @@ public class GroupCoordinatorConfig {
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members.";
- ///
/// Streams group configs
- ///
public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG = "group.streams.session.timeout.ms";
public static final int STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT = 45000;
public static final String STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC = "The timeout to detect client failures when using the streams group protocol.";
@@ -274,59 +262,59 @@ public class GroupCoordinatorConfig {
public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
- // Group coordinator configs
- .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
- ConfigDef.ValidList.in(Group.GroupType.documentValidValues()), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
- .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, GROUP_COORDINATOR_NUM_THREADS_DOC)
- .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
- .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC)
- .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, OFFSETS_LOAD_BUFFER_SIZE_DOC)
- .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_REPLICATION_FACTOR_DOC)
- .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_PARTITIONS_DOC)
- .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
- .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
-
- // Offset configs
- .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
- .define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC)
- .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
-
- // Classic group configs
- .define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
- .define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
- .define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
- .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC)
-
- // Consumer group configs
- .define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
- .define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
- .define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
- .define(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
- .define(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
- .define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
- .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC)
- .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
- .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
-
- // Share group configs
- .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
- .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
- .define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
- .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
- .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
- .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
- .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC)
-
- // Streams group configs
- .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
- .define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
- .define(STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
- .define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
- .define(STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
- .define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
- .define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
- .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
- .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
+ // Group coordinator configs
+ .define(GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, LIST, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DEFAULT,
+ ConfigDef.ValidList.inWithEmptyCheck(false, Group.GroupType.documentValidValues()), MEDIUM, GROUP_COORDINATOR_REBALANCE_PROTOCOLS_DOC)
+ .define(GROUP_COORDINATOR_NUM_THREADS_CONFIG, INT, GROUP_COORDINATOR_NUM_THREADS_DEFAULT, atLeast(1), HIGH, GROUP_COORDINATOR_NUM_THREADS_DOC)
+ .define(GROUP_COORDINATOR_APPEND_LINGER_MS_CONFIG, INT, GROUP_COORDINATOR_APPEND_LINGER_MS_DEFAULT, atLeast(0), MEDIUM, GROUP_COORDINATOR_APPEND_LINGER_MS_DOC)
+ .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, INT, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, atLeast(1), HIGH, OFFSET_COMMIT_TIMEOUT_MS_DOC)
+ .define(OFFSETS_LOAD_BUFFER_SIZE_CONFIG, INT, OFFSETS_LOAD_BUFFER_SIZE_DEFAULT, atLeast(1), HIGH, OFFSETS_LOAD_BUFFER_SIZE_DOC)
+ .define(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, SHORT, OFFSETS_TOPIC_REPLICATION_FACTOR_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_REPLICATION_FACTOR_DOC)
+ .define(OFFSETS_TOPIC_PARTITIONS_CONFIG, INT, OFFSETS_TOPIC_PARTITIONS_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_PARTITIONS_DOC)
+ .define(OFFSETS_TOPIC_SEGMENT_BYTES_CONFIG, INT, OFFSETS_TOPIC_SEGMENT_BYTES_DEFAULT, atLeast(1), HIGH, OFFSETS_TOPIC_SEGMENT_BYTES_DOC)
+ .define(OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, INT, (int) OFFSETS_TOPIC_COMPRESSION_CODEC_DEFAULT.id, HIGH, OFFSETS_TOPIC_COMPRESSION_CODEC_DOC)
+
+ // Offset configs
+ .define(OFFSET_METADATA_MAX_SIZE_CONFIG, INT, OFFSET_METADATA_MAX_SIZE_DEFAULT, HIGH, OFFSET_METADATA_MAX_SIZE_DOC)
+ .define(OFFSETS_RETENTION_MINUTES_CONFIG, INT, OFFSETS_RETENTION_MINUTES_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_MINUTES_DOC)
+ .define(OFFSETS_RETENTION_CHECK_INTERVAL_MS_CONFIG, LONG, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DEFAULT, atLeast(1), HIGH, OFFSETS_RETENTION_CHECK_INTERVAL_MS_DOC)
+
+ // Classic group configs
+ .define(GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+ .define(GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, MEDIUM, GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+ .define(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, MEDIUM, GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
+ .define(GROUP_MAX_SIZE_CONFIG, INT, GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, GROUP_MAX_SIZE_DOC)
+
+ // Consumer group configs
+ .define(CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_SESSION_TIMEOUT_MS_DOC)
+ .define(CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+ .define(CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+ .define(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(CONSUMER_GROUP_MAX_SIZE_CONFIG, INT, CONSUMER_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, CONSUMER_GROUP_MAX_SIZE_DOC)
+ .define(CONSUMER_GROUP_ASSIGNORS_CONFIG, LIST, CONSUMER_GROUP_ASSIGNORS_DEFAULT, null, MEDIUM, CONSUMER_GROUP_ASSIGNORS_DOC)
+ .define(CONSUMER_GROUP_MIGRATION_POLICY_CONFIG, STRING, CONSUMER_GROUP_MIGRATION_POLICY_DEFAULT, ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(ConsumerGroupMigrationPolicy.class)), MEDIUM, CONSUMER_GROUP_MIGRATION_POLICY_DOC)
+
+ // Share group configs
+ .define(SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_SESSION_TIMEOUT_MS_DOC)
+ .define(SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+ .define(SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+ .define(SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(SHARE_GROUP_MAX_SIZE_CONFIG, INT, SHARE_GROUP_MAX_SIZE_DEFAULT, between(1, 1000), MEDIUM, SHARE_GROUP_MAX_SIZE_DOC)
+
+ // Streams group configs
+ .define(STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_SESSION_TIMEOUT_MS_DOC)
+ .define(STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_DOC)
+ .define(STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, INT, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_DOC)
+ .define(STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
+ .define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
+ .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
+ .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
/**
@@ -443,68 +431,68 @@ public GroupCoordinatorConfig(AbstractConfig config) {
String.format("%s must be less than %s", CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_GROUP_SESSION_TIMEOUT_MS_CONFIG));
// Share group configs validation.
require(shareGroupMaxHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
- String.format("%s must be greater than or equal to %s",
- SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
+ String.format("%s must be greater than or equal to %s",
+ SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(shareGroupHeartbeatIntervalMs >= shareGroupMinHeartbeatIntervalMs,
- String.format("%s must be greater than or equal to %s",
- SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
+ String.format("%s must be greater than or equal to %s",
+ SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(shareGroupHeartbeatIntervalMs <= shareGroupMaxHeartbeatIntervalMs,
- String.format("%s must be less than or equal to %s",
- SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
+ String.format("%s must be less than or equal to %s",
+ SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
require(shareGroupMaxSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
- String.format("%s must be greater than or equal to %s",
- SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
+ String.format("%s must be greater than or equal to %s",
+ SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(shareGroupSessionTimeoutMs >= shareGroupMinSessionTimeoutMs,
- String.format("%s must be greater than or equal to %s",
- SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
+ String.format("%s must be greater than or equal to %s",
+ SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(shareGroupSessionTimeoutMs <= shareGroupMaxSessionTimeoutMs,
- String.format("%s must be less than or equal to %s",
- SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
+ String.format("%s must be less than or equal to %s",
+ SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG, SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
require(shareGroupHeartbeatIntervalMs < shareGroupSessionTimeoutMs,
- String.format("%s must be less than %s",
- SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+ String.format("%s must be less than %s",
+ SHARE_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_GROUP_SESSION_TIMEOUT_MS_CONFIG));
// Streams group configs validation.
require(streamsGroupMaxHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs,
- String.format("%s must be greater than or equal to %s",
- STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
+ String.format("%s must be greater than or equal to %s",
+ STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(streamsGroupHeartbeatIntervalMs >= streamsGroupMinHeartbeatIntervalMs,
- String.format("%s must be greater than or equal to %s",
- STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
+ String.format("%s must be greater than or equal to %s",
+ STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG));
require(streamsGroupHeartbeatIntervalMs <= streamsGroupMaxHeartbeatIntervalMs,
- String.format("%s must be less than or equal to %s",
- STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
+ String.format("%s must be less than or equal to %s",
+ STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG));
require(streamsGroupMaxSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs,
- String.format("%s must be greater than or equal to %s", STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
+ String.format("%s must be greater than or equal to %s", STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(streamsGroupSessionTimeoutMs >= streamsGroupMinSessionTimeoutMs,
- String.format("%s must be greater than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
+ String.format("%s must be greater than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG));
require(streamsGroupSessionTimeoutMs <= streamsGroupMaxSessionTimeoutMs,
- String.format("%s must be less than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
+ String.format("%s must be less than or equal to %s", STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG));
require(streamsGroupNumStandbyReplicas <= streamsGroupMaxStandbyReplicas,
- String.format("%s must be less than or equal to %s", STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
+ String.format("%s must be less than or equal to %s", STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG));
require(streamsGroupHeartbeatIntervalMs < streamsGroupSessionTimeoutMs,
- String.format("%s must be less than %s",
- STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
+ String.format("%s must be less than %s",
+ STREAMS_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG));
}
public static GroupCoordinatorConfig fromProps(
- Map, ?> props
+ Map, ?> props
) {
return new GroupCoordinatorConfig(
- new AbstractConfig(
- GroupCoordinatorConfig.CONFIG_DEF,
- props
- )
+ new AbstractConfig(
+ GroupCoordinatorConfig.CONFIG_DEF,
+ props
+ )
);
}
protected List consumerGroupAssignors(
- AbstractConfig config
+ AbstractConfig config
) {
Map defaultAssignors = CONSUMER_GROUP_BUILTIN_ASSIGNORS
- .stream()
- .collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity()));
+ .stream()
+ .collect(Collectors.toMap(ConsumerGroupPartitionAssignor::name, Function.identity()));
List assignors = new ArrayList<>();
@@ -564,8 +552,8 @@ public Map extractGroupConfigMap(ShareGroupConfig shareGroupCon
*/
public Map extractConsumerGroupConfigMap() {
return Map.of(
- GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(),
- GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs());
+ GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(),
+ GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs());
}
/**
@@ -670,17 +658,17 @@ public long offsetsRetentionCheckIntervalMs() {
/**
* For subscribed consumers, committed offset of a specific partition will be expired and discarded when:
- * 1) This retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty);
- * 2) This retention period has elapsed since the last time an offset is committed for the partition AND
- * the group is no longer subscribed to the corresponding topic.
- *
+ * 1) This retention period has elapsed after the consumer group loses all its consumers (i.e. becomes empty);
+ * 2) This retention period has elapsed since the last time an offset is committed for the partition AND
+ * the group is no longer subscribed to the corresponding topic.
+ *
* For standalone consumers (using manual assignment), offsets will be expired after this retention period has
* elapsed since the time of last commit.
- *
+ *
* Note that when a group is deleted via the DeleteGroups request, its committed offsets will also be deleted immediately;
- *
+ *
* Also, when a topic is deleted via the delete-topic request, upon propagated metadata update any group's
- * committed offsets for that topic will also be deleted without extra retention period.
+ * committed offsets for that topic will also be deleted without extra retention period.
*/
public long offsetsRetentionMs() {
return offsetsRetentionMs;
diff --git a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
index 4d82172c7d87b..6ffc76b636538 100644
--- a/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
+++ b/server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java
@@ -33,10 +33,12 @@
import static org.apache.kafka.common.config.ConfigDef.Type.STRING;
public class KRaftConfigs {
- /** KRaft mode configs */
+ /**
+ * KRaft mode configs
+ */
public static final String PROCESS_ROLES_CONFIG = "process.roles";
public static final String PROCESS_ROLES_DOC = "The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. ";
-
+
public static final String INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG = "initial.broker.registration.timeout.ms";
public static final int INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT = 60000;
public static final String INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC = "When initially registering with the controller quorum, the number of milliseconds to wait before declaring failure and exiting the broker process.";
@@ -118,10 +120,10 @@ public class KRaftConfigs {
public static final long CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DEFAULT = 2000;
public static final String CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC = "We will log an error message about controller events that take longer than this threshold.";
- public static final ConfigDef CONFIG_DEF = new ConfigDef()
+ public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_CONFIG, LONG, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES, atLeast(1), HIGH, METADATA_SNAPSHOT_MAX_NEW_RECORD_BYTES_DOC)
.define(METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, LONG, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DEFAULT, atLeast(0), HIGH, METADATA_SNAPSHOT_MAX_INTERVAL_MS_DOC)
- .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
+ .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.inWithEmptyCheck(false, "broker", "controller"), HIGH, PROCESS_ROLES_DOC)
.define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE, atLeast(0), HIGH, NODE_ID_DOC)
.define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC)
.define(BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM, BROKER_HEARTBEAT_INTERVAL_MS_DOC)
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
index 21c92cd84dff4..433f5424c1fab 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java
@@ -167,7 +167,7 @@ public Optional serverConfigName(String configName) {
.define(ServerLogConfigs.LOG_RETENTION_BYTES_CONFIG, LONG, ServerLogConfigs.LOG_RETENTION_BYTES_DEFAULT, HIGH, ServerLogConfigs.LOG_RETENTION_BYTES_DOC)
.define(ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_CONFIG, LONG, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.LOG_CLEANUP_INTERVAL_MS_DOC)
- .define(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ConfigDef.ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT, TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, ServerLogConfigs.LOG_CLEANUP_POLICY_DOC)
+ .define(ServerLogConfigs.LOG_CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ConfigDef.ValidList.inWithEmptyCheck(false, TopicConfig.CLEANUP_POLICY_COMPACT, TopicConfig.CLEANUP_POLICY_DELETE, TopicConfig.CLEANUP_POLICY_NONE), MEDIUM, ServerLogConfigs.LOG_CLEANUP_POLICY_DOC)
.define(ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DEFAULT, atLeast(4), MEDIUM, ServerLogConfigs.LOG_INDEX_SIZE_MAX_BYTES_DOC)
.define(ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_CONFIG, INT, ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DEFAULT, atLeast(0), MEDIUM, ServerLogConfigs.LOG_INDEX_INTERVAL_BYTES_DOC)
.define(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, LONG, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DEFAULT, atLeast(1), HIGH, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_DOC)
@@ -189,6 +189,7 @@ public Optional serverConfigName(String configName) {
.defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC);
private static final LogConfigDef CONFIG = new LogConfigDef();
+
static {
CONFIG.
define(TopicConfig.SEGMENT_BYTES_CONFIG, INT, DEFAULT_SEGMENT_BYTES, atLeast(LegacyRecord.RECORD_OVERHEAD_V0), MEDIUM,
@@ -221,7 +222,7 @@ public Optional serverConfigName(String configName) {
TopicConfig.FILE_DELETE_DELAY_MS_DOC)
.define(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, DOUBLE, DEFAULT_MIN_CLEANABLE_DIRTY_RATIO, between(0, 1), MEDIUM,
TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_DOC)
- .define(TopicConfig.CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ValidList.in(TopicConfig.CLEANUP_POLICY_COMPACT,
+ .define(TopicConfig.CLEANUP_POLICY_CONFIG, LIST, ServerLogConfigs.LOG_CLEANUP_POLICY_DEFAULT, ValidList.inWithEmptyCheck(false, TopicConfig.CLEANUP_POLICY_COMPACT,
TopicConfig.CLEANUP_POLICY_DELETE), MEDIUM, TopicConfig.CLEANUP_POLICY_DOC)
.define(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, BOOLEAN, DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE,
MEDIUM, TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_DOC)
@@ -468,6 +469,7 @@ public static void validateNames(Properties props) {
* Validates the values of the given properties. Can be called by both client and server.
* The `props` supplied should contain all the LogConfig properties and the default values are extracted from the
* LogConfig class.
+ *
* @param props The properties to be validated
*/
public static void validateValues(Map, ?> props) {
@@ -484,6 +486,7 @@ public static void validateValues(Map, ?> props) {
* Validates the values of the given properties. Should be called only by the broker.
* The `props` supplied doesn't contain any topic-level configs, only broker-level configs.
* The default values should be extracted from the KafkaConfig.
+ *
* @param props The properties to be validated
*/
public static void validateBrokerLogConfigValues(Map, ?> props,
@@ -499,9 +502,10 @@ public static void validateBrokerLogConfigValues(Map, ?> props,
* Validates the values of the given properties. Should be called only by the broker.
* The `newConfigs` supplied contains the topic-level configs,
* The default values should be extracted from the KafkaConfig.
- * @param existingConfigs The existing properties
- * @param newConfigs The new properties to be validated
- * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled
+ *
+ * @param existingConfigs The existing properties
+ * @param newConfigs The new properties to be validated
+ * @param isRemoteLogStorageSystemEnabled true if system wise remote log storage is enabled
*/
private static void validateTopicLogConfigValues(Map existingConfigs,
Map, ?> newConfigs,