Skip to content

Commit 4014d66

Browse files
authored
MINOR: Fix GroupConfigManager.validate() to include all group type defaults (apache#21593)
`GroupConfigManager.validate()` only included `consumer` group defaults in the combined configs, causing `share` and `streams` group fields to fall back to hardcoded defaults. This leads to false validation failures when broker-configured values differ from those defaults. Fix by using `extractGroupConfigMap()` which covers all group types, and adding the missing `extractStreamsGroupConfigMap()` to it. Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent 244e848 commit 4014d66

3 files changed

Lines changed: 55 additions & 5 deletions

File tree

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfigManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,17 +76,18 @@ public List<String> groupIds() {
7676
/**
7777
* Validate the given properties.
7878
*
79-
* @param newGroupConfig The new group config.
80-
* @param groupCoordinatorConfig The group coordinator config.
81-
* @throws InvalidConfigurationException If validation fails
79+
* @param newGroupConfig The new group config.
80+
* @param groupCoordinatorConfig The group coordinator config.
81+
* @param shareGroupConfig The share group config.
82+
* @throws InvalidConfigurationException If validation fails.
8283
*/
8384
public static void validate(
8485
Properties newGroupConfig,
8586
GroupCoordinatorConfig groupCoordinatorConfig,
8687
ShareGroupConfig shareGroupConfig
8788
) {
8889
Properties combinedConfigs = new Properties();
89-
combinedConfigs.putAll(groupCoordinatorConfig.extractConsumerGroupConfigMap());
90+
combinedConfigs.putAll(groupCoordinatorConfig.extractGroupConfigMap(shareGroupConfig));
9091
combinedConfigs.putAll(newGroupConfig);
9192
GroupConfig.validate(combinedConfigs, groupCoordinatorConfig, shareGroupConfig);
9293
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,12 +655,13 @@ protected List<ShareGroupPartitionAssignor> shareGroupAssignors(
655655
}
656656

657657
/**
658-
* Copy the subset of properties that are relevant to consumer group and share group.
658+
* Copy the subset of properties that are relevant to consumer group, share group and streams group.
659659
*/
660660
public Map<String, Integer> extractGroupConfigMap(ShareGroupConfig shareGroupConfig) {
661661
Map<String, Integer> defaultConfigs = new HashMap<>();
662662
defaultConfigs.putAll(extractConsumerGroupConfigMap());
663663
defaultConfigs.putAll(shareGroupConfig.extractShareGroupConfigMap(this));
664+
defaultConfigs.putAll(extractStreamsGroupConfigMap());
664665
return Collections.unmodifiableMap(defaultConfigs);
665666
}
666667

@@ -673,6 +674,17 @@ GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, consumerGroupSessionTimeoutMs(),
673674
GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, consumerGroupHeartbeatIntervalMs());
674675
}
675676

677+
/**
678+
* Copy the subset of properties that are relevant to streams group.
679+
*/
680+
public Map<String, Integer> extractStreamsGroupConfigMap() {
681+
return Map.of(
682+
GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, streamsGroupSessionTimeoutMs(),
683+
GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, streamsGroupHeartbeatIntervalMs(),
684+
GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, streamsGroupNumStandbyReplicas(),
685+
GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, streamsGroupInitialRebalanceDelayMs());
686+
}
687+
676688
/**
677689
* The number of threads or event loops running.
678690
*/

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigManagerTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
package org.apache.kafka.coordinator.group;
1919

20+
import org.apache.kafka.common.config.AbstractConfig;
2021
import org.apache.kafka.common.errors.InvalidRequestException;
22+
import org.apache.kafka.common.utils.Utils;
2123
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig;
2224

2325
import org.junit.jupiter.api.AfterEach;
2426
import org.junit.jupiter.api.BeforeEach;
2527
import org.junit.jupiter.api.Test;
2628

29+
import java.util.Arrays;
2730
import java.util.HashMap;
2831
import java.util.Map;
2932
import java.util.Optional;
@@ -32,6 +35,7 @@
3235
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG;
3336
import static org.apache.kafka.coordinator.group.GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG;
3437
import static org.apache.kafka.coordinator.group.GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG;
38+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
3539
import static org.junit.jupiter.api.Assertions.assertEquals;
3640
import static org.junit.jupiter.api.Assertions.assertFalse;
3741
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -81,11 +85,44 @@ public void testUpdateGroupConfig() {
8185
assertEquals(6000, config.getInt(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG));
8286
}
8387

88+
@Test
89+
public void testValidateUsesAllGroupTypeDefaults() {
90+
Map<String, Object> configs = new HashMap<>();
91+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG, 46000);
92+
configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 46000);
93+
94+
GroupCoordinatorConfig groupCoordinatorConfig = createGroupCoordinatorConfig(configs);
95+
ShareGroupConfig shareGroupConfig = createShareGroupConfig();
96+
97+
Properties newGroupConfig = new Properties();
98+
newGroupConfig.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "2");
99+
100+
assertDoesNotThrow(() ->
101+
GroupConfigManager.validate(newGroupConfig, groupCoordinatorConfig, shareGroupConfig));
102+
}
103+
84104
public static GroupConfigManager createConfigManager() {
85105
Map<String, String> defaultConfig = new HashMap<>();
86106
defaultConfig.put(CONSUMER_SESSION_TIMEOUT_MS_CONFIG, String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_SESSION_TIMEOUT_MS_DEFAULT));
87107
defaultConfig.put(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_DEFAULT));
88108
defaultConfig.put(SHARE_RECORD_LOCK_DURATION_MS_CONFIG, String.valueOf(ShareGroupConfig.SHARE_GROUP_RECORD_LOCK_DURATION_MS_DEFAULT));
89109
return new GroupConfigManager(defaultConfig);
90110
}
111+
112+
private static GroupCoordinatorConfig createGroupCoordinatorConfig(Map<String, Object> overrides) {
113+
Map<String, Object> configs = new HashMap<>(overrides);
114+
return new GroupCoordinatorConfig(new AbstractConfig(
115+
GroupCoordinatorConfig.CONFIG_DEF,
116+
configs,
117+
false
118+
));
119+
}
120+
121+
private static ShareGroupConfig createShareGroupConfig() {
122+
return new ShareGroupConfig(new AbstractConfig(
123+
Utils.mergeConfigs(Arrays.asList(ShareGroupConfig.CONFIG_DEF, GroupCoordinatorConfig.CONFIG_DEF)),
124+
new HashMap<>(),
125+
false
126+
));
127+
}
91128
}

0 commit comments

Comments
 (0)