Skip to content

Commit f4299b2

Browse files
committed
tmp
1 parent 777f5f9 commit f4299b2

7 files changed

Lines changed: 253 additions & 63 deletions

File tree

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/MetadataVersionIntegrationTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.clients.admin.NewTopic;
2323
import org.apache.kafka.common.config.ConfigResource;
2424
import org.apache.kafka.common.test.ClusterInstance;
25+
import org.apache.kafka.common.test.api.ClusterConfigProperty;
2526
import org.apache.kafka.common.test.api.ClusterTest;
2627
import org.apache.kafka.common.test.api.ClusterTests;
2728
import org.apache.kafka.common.test.api.Type;
@@ -77,21 +78,23 @@ public void testUpgradeSameVersion(ClusterInstance clusterInstance) throws Excep
7778
}
7879
}
7980

80-
@ClusterTest(types = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_9_IV0)
81+
@ClusterTest(types = Type.KRAFT, metadataVersion = MetadataVersion.IBP_4_3_IV1,
82+
serverProperties = @ClusterConfigProperty(key = "log.segment.bytes", value = "1048576")
83+
)
8184
public void testMetadataVersionUpgradeWithValidTopicConfigs(ClusterInstance clusterInstance) throws Exception {
8285
try (var admin = clusterInstance.admin()) {
8386
// Create a topic with a valid dynamic config
8487
admin.createTopics(List.of(new NewTopic("test-topic", 1, (short) 1))).all().get();
8588
admin.incrementalAlterConfigs(Map.of(
86-
new ConfigResource(ConfigResource.Type.TOPIC, "test-topic"),
89+
new ConfigResource(ConfigResource.Type.BROKER, "0"),
8790
List.of(new AlterConfigOp(
88-
new ConfigEntry("segment.bytes", "10485760"),
91+
new ConfigEntry("log.segment.bytes", String.valueOf(2 * 1024 * 1024)),
8992
AlterConfigOp.OpType.SET))
9093
)).all().get();
9194

9295
// Upgrade metadata.version past IBP_4_0_IV0 — should succeed
9396
// because the config passes pre-flight validation
94-
short targetVersion = MetadataVersion.IBP_4_0_IV0.featureLevel();
97+
short targetVersion = MetadataVersion.IBP_4_3_IV2.featureLevel();
9598
admin.updateFeatures(Map.of(
9699
MetadataVersion.FEATURE_NAME,
97100
new FeatureUpdate(targetVersion, FeatureUpdate.UpgradeType.UPGRADE)

metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.kafka.common.DirectoryId;
2121
import org.apache.kafka.common.Uuid;
22+
import org.apache.kafka.common.config.ConfigResource;
2223
import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
2324
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
2425
import org.apache.kafka.common.errors.InconsistentClusterIdException;
@@ -273,7 +274,7 @@ boolean check() {
273274
/**
274275
* Maps broker IDs to their non-default static configs reported during registration.
275276
*/
276-
private final Map<Integer, Map<String, String>> brokerStaticConfigs;
277+
private final Map<ConfigResource, Map<String, String>> brokerStaticConfigs;
277278

278279
/**
279280
* Manages the kafka.controller:type=KafkaController,name=TimeSinceLastHeartbeatReceivedMs,broker=<brokerId> metrics.
@@ -346,7 +347,7 @@ Map<Integer, BrokerRegistration> brokerRegistrations() {
346347
return brokerRegistrations;
347348
}
348349

349-
public Map<Integer, Map<String, String>> brokerStaticConfigs() {
350+
public Map<ConfigResource, Map<String, String>> brokerStaticConfigs() {
350351
return Collections.unmodifiableMap(brokerStaticConfigs);
351352
}
352353

@@ -449,7 +450,7 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
449450
for (BrokerRegistrationRequestData.StaticConfig sc : request.staticConfigs()) {
450451
statics.put(sc.name(), sc.value());
451452
}
452-
brokerStaticConfigs.put(brokerId, Map.copyOf(statics));
453+
brokerStaticConfigs.put(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)), Map.copyOf(statics));
453454

454455
// Write static configs to the record when MV supports it
455456
if (featureControl.metadataVersionOrThrow().isStaticConfigReportingSupported()) {
@@ -608,7 +609,7 @@ public void replay(RegisterBrokerRecord record, long offset) {
608609
for (RegisterBrokerRecord.BrokerStaticConfig sc : record.staticConfigs()) {
609610
replayedStatics.put(sc.name(), sc.value());
610611
}
611-
brokerStaticConfigs.put(record.brokerId(), Map.copyOf(replayedStatics));
612+
brokerStaticConfigs.put(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(record.brokerId())), Map.copyOf(replayedStatics));
612613

613614
updateDirectories(brokerId, prevRegistration == null ? null : prevRegistration.directories(), record.logDirs());
614615
if (heartbeatManager != null) {

metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.kafka.common.utils.LogContext;
3333
import org.apache.kafka.metadata.KafkaConfigSchema;
3434
import org.apache.kafka.server.common.ApiMessageAndVersion;
35+
import org.apache.kafka.server.common.ConfigFeatureGate;
3536
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
3637
import org.apache.kafka.server.common.MetadataVersion;
3738
import org.apache.kafka.server.mutable.BoundedList;
@@ -53,6 +54,7 @@
5354
import java.util.Objects;
5455
import java.util.Optional;
5556
import java.util.function.Consumer;
57+
import java.util.stream.Collectors;
5658

5759
import static org.apache.kafka.clients.admin.AlterConfigOp.OpType.APPEND;
5860
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
@@ -173,8 +175,7 @@ private ConfigurationControlManager(LogContext logContext,
173175
Map<String, Object> staticConfig,
174176
int nodeId,
175177
FeatureControlManager featureControl,
176-
ClusterControlManager clusterControl
177-
) {
178+
ClusterControlManager clusterControl) {
178179
this.log = logContext.logger(ConfigurationControlManager.class);
179180
this.snapshotRegistry = snapshotRegistry;
180181
this.configSchema = configSchema;
@@ -692,9 +693,9 @@ Optional<ApiError> validateConfigsForFeatureUpgrade(Map<String, Short> updates)
692693
}
693694

694695
/**
695-
* Validate that all existing configurations pass their ConfigDef validators before a
696-
* metadata version upgrade. This catches config values that were accepted under a previous
697-
* version but would be invalid under the new version's stricter constraints.
696+
* Validate that all existing configurations are compatible with a proposed metadata version
697+
* upgrade. Uses MV-specific constraints when available, falling back to ConfigDef validators
698+
* only when crossing the initial threshold (< IBP_4_0_IV0 to >= IBP_4_0_IV0).
698699
*
699700
* @param proposedLevel The proposed metadata version feature level.
700701
* @return An error if any config value is invalid, or empty if all are valid.
@@ -704,39 +705,41 @@ private Optional<ApiError> validateMetadataVersionUpgradeConstraints(short propo
704705
return Optional.empty();
705706
}
706707

708+
MetadataVersion proposedMV = MetadataVersion.fromFeatureLevel(proposedLevel);
709+
707710
Map<ConfigResource, String> violations = new HashMap<>();
708-
for (Entry<ConfigResource, TimelineHashMap<String, String>> entry : configData.entrySet()) {
709-
ConfigResource resource = entry.getKey();
710-
Map<String, String> configs = entry.getValue();
711-
for (Entry<String, String> configEntry : configs.entrySet()) {
712-
try {
713-
configSchema.validateValue(resource.type(), configEntry.getKey(), configEntry.getValue());
714-
} catch (ConfigException e) {
715-
violations.put(resource, e.getMessage());
716-
}
717-
}
718-
}
719-
// Validate broker static configs
711+
712+
// Validate broker configs
720713
if (clusterControl != null) {
721-
Map<Integer, Map<String, String>> brokerStatics = clusterControl.brokerStaticConfigs();
722-
for (Entry<Integer, Map<String, String>> brokerEntry : brokerStatics.entrySet()) {
723-
int brokerId = brokerEntry.getKey();
724-
for (Entry<String, String> configEntry : brokerEntry.getValue().entrySet()) {
714+
for (Entry<ConfigResource, Map<String, String>> brokerEntry : clusterControl.brokerStaticConfigs().entrySet()) {
715+
int brokerId = Integer.parseInt(brokerEntry.getKey().name());
716+
ConfigResource brokerResource = brokerEntry.getKey();
717+
718+
List<String> brokerViolations = new ArrayList<>();
719+
720+
for (Entry<String, ConfigEntry> configEntry : computeEffectiveBrokerConfigs(brokerId).entrySet()) {
725721
try {
726-
configSchema.validateValue(ConfigResource.Type.BROKER, configEntry.getKey(), configEntry.getValue());
727-
} catch (ConfigException e) {
728-
violations.put(new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(brokerId)), e.getMessage());
722+
ConfigFeatureGate.validate(proposedMV, configEntry.getKey(), configEntry.getValue().value());
723+
} catch (Exception e) {
724+
brokerViolations.add(e.getMessage());
729725
}
730726
}
727+
if (!brokerViolations.isEmpty()) {
728+
violations.put(brokerResource, String.join(", ", brokerViolations));
729+
}
731730
}
732731
}
733732

734733
if (!violations.isEmpty()) {
735-
return Optional.of(new ApiError(INVALID_CONFIG,
736-
"Cannot upgrade " + MetadataVersion.FEATURE_NAME +
737-
" to version " + proposedLevel +
738-
" because existing configs are invalid: " + violations +
739-
". Fix these configs before upgrading."));
734+
String errorMessage = violations.entrySet().stream()
735+
.map(entry -> "NodeId=" + entry.getKey().name() + " -> " + entry.getValue())
736+
.collect(Collectors.joining("; "));
737+
738+
return Optional.of(new ApiError(Errors.INVALID_CONFIG,
739+
"Cannot upgrade " + MetadataVersion.FEATURE_NAME +
740+
" to version " + proposedLevel +
741+
" because existing configs are invalid: " + errorMessage +
742+
". Fix these configs before upgrading."));
740743
}
741744
return Optional.empty();
742745
}
@@ -797,11 +800,25 @@ Map<String, ConfigEntry> computeEffectiveTopicConfigs(Map<String, String> creati
797800
currentControllerConfig(), creationConfigs);
798801
}
799802

803+
Map<String, ConfigEntry> computeEffectiveBrokerConfigs(int brokerId) {
804+
return configSchema.resolveEffectiveBrokerConfigs(brokerStaticConfig(brokerId), clusterConfig(), brokerDynamicConfig(brokerId));
805+
}
806+
800807
Map<String, String> clusterConfig() {
801808
Map<String, String> result = configData.get(DEFAULT_NODE);
802809
return (result == null) ? Map.of() : result;
803810
}
804811

812+
Map<String, String> brokerStaticConfig(int brokerId) {
813+
Map<String, String> result = clusterControl.brokerStaticConfigs().get(new ConfigResource(BROKER, String.valueOf(brokerId)));
814+
return (result == null) ? Map.of() : result;
815+
}
816+
817+
Map<String, String> brokerDynamicConfig(int brokerId) {
818+
Map<String, String> result = configData.get(new ConfigResource(BROKER, String.valueOf(brokerId)));
819+
return (result == null) ? Map.of() : result;
820+
}
821+
805822
Map<String, String> currentControllerConfig() {
806823
Map<String, String> result = configData.get(currentController);
807824
return (result == null) ? Map.of() : result;

metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.kafka.clients.admin.ConfigEntry;
2121
import org.apache.kafka.clients.admin.ConfigEntry.ConfigSource;
2222
import org.apache.kafka.common.config.ConfigDef;
23-
import org.apache.kafka.common.config.ConfigException;
2423
import org.apache.kafka.common.config.ConfigResource;
2524
import org.apache.kafka.common.config.types.Password;
2625
import org.apache.kafka.common.metadata.ConfigRecord;
@@ -114,26 +113,6 @@ public boolean isSplittable(ConfigResource.Type type, String key) {
114113
return configKey.type == ConfigDef.Type.LIST;
115114
}
116115

117-
/**
118-
* Validate a single config value against the ConfigDef's type and validator for the given
119-
* resource type and key. Throws ConfigException if the value is invalid.
120-
*
121-
* @param type The resource type (TOPIC, BROKER, etc.)
122-
* @param key The config key name.
123-
* @param value The string value to validate.
124-
* @throws ConfigException if the value fails type parsing or validator checks.
125-
*/
126-
public void validateValue(ConfigResource.Type type, String key, String value) {
127-
ConfigDef configDef = configDefs.get(type);
128-
if (configDef == null) return;
129-
ConfigDef.ConfigKey configKey = configDef.configKeys().get(key);
130-
if (configKey == null) return;
131-
Object parsedValue = ConfigDef.parseType(key, value, configKey.type);
132-
if (configKey.validator != null) {
133-
configKey.validator.ensureValid(key, parsedValue);
134-
}
135-
}
136-
137116
/**
138117
* Returns true if the configuration key specified in this ConfigRecord is sensitive, or if
139118
* we don't know whether it is sensitive.
@@ -187,6 +166,24 @@ public Map<String, ConfigEntry> resolveEffectiveTopicConfigs(
187166
return effectiveConfigs;
188167
}
189168

169+
public Map<String, ConfigEntry> resolveEffectiveBrokerConfigs(
170+
Map<String, ?> staticNodeConfig,
171+
Map<String, ?> dynamicClusterConfigs,
172+
Map<String, ?> dynamicNodeConfigs) {
173+
ConfigDef configDef = configDefs.getOrDefault(ConfigResource.Type.BROKER, EMPTY_CONFIG_DEF);
174+
HashMap<String, ConfigEntry> effectiveConfigs = new HashMap<>();
175+
for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) {
176+
// This config is internal; if the user hasn't set it explicitly, it should not be returned.
177+
if (configKey.internalConfig) {
178+
continue;
179+
}
180+
ConfigEntry entry = resolveEffectiveBrokerConfig(configKey, staticNodeConfig,
181+
dynamicClusterConfigs, dynamicNodeConfigs);
182+
effectiveConfigs.put(entry.name(), entry);
183+
}
184+
return effectiveConfigs;
185+
}
186+
190187
public ConfigEntry resolveEffectiveTopicConfig(
191188
String keyName,
192189
Map<String, ?> staticNodeConfig,
@@ -215,27 +212,49 @@ public ConfigEntry resolveEffectiveTopicConfig(
215212
dynamicTopicConfigs.get(configKey.name),
216213
ConfigSource.DYNAMIC_TOPIC_CONFIG, Function.identity());
217214
}
215+
return resolveEffectiveBrokerConfig(configKey, staticNodeConfig, dynamicClusterConfigs, dynamicNodeConfigs);
216+
}
217+
218+
public ConfigEntry resolveEffectiveBrokerConfig(
219+
ConfigDef.ConfigKey configKey,
220+
Map<String, ?> staticNodeConfig,
221+
Map<String, ?> dynamicClusterConfigs,
222+
Map<String, ?> dynamicNodeConfigs
223+
) {
218224
List<ConfigSynonym> synonyms = logConfigSynonyms.getOrDefault(configKey.name, List.of());
225+
if (dynamicNodeConfigs.containsKey(configKey.name)) {
226+
return toConfigEntry(configKey, dynamicNodeConfigs.get(configKey.name),
227+
ConfigSource.DYNAMIC_BROKER_CONFIG, Function.identity());
228+
}
219229
for (ConfigSynonym synonym : synonyms) {
220230
if (dynamicNodeConfigs.containsKey(synonym.name())) {
221231
return toConfigEntry(configKey, dynamicNodeConfigs.get(synonym.name()),
222-
ConfigSource.DYNAMIC_BROKER_CONFIG, synonym.converter());
232+
ConfigSource.DYNAMIC_BROKER_CONFIG, synonym.converter());
223233
}
224234
}
235+
236+
if (dynamicClusterConfigs.containsKey(configKey.name)) {
237+
return toConfigEntry(configKey, dynamicClusterConfigs.get(configKey.name),
238+
ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, Function.identity());
239+
}
225240
for (ConfigSynonym synonym : synonyms) {
226241
if (dynamicClusterConfigs.containsKey(synonym.name())) {
227242
return toConfigEntry(configKey, dynamicClusterConfigs.get(synonym.name()),
228-
ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, synonym.converter());
243+
ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG, synonym.converter());
229244
}
230245
}
246+
if (staticNodeConfig.containsKey(configKey.name)) {
247+
return toConfigEntry(configKey, staticNodeConfig.get(configKey.name),
248+
ConfigSource.STATIC_BROKER_CONFIG, Function.identity());
249+
}
231250
for (ConfigSynonym synonym : synonyms) {
232251
if (staticNodeConfig.containsKey(synonym.name())) {
233252
return toConfigEntry(configKey, staticNodeConfig.get(synonym.name()),
234-
ConfigSource.STATIC_BROKER_CONFIG, synonym.converter());
253+
ConfigSource.STATIC_BROKER_CONFIG, synonym.converter());
235254
}
236255
}
237256
return toConfigEntry(configKey, configKey.hasDefault() ? configKey.defaultValue : null,
238-
ConfigSource.DEFAULT_CONFIG, Function.identity());
257+
ConfigSource.DEFAULT_CONFIG, Function.identity());
239258
}
240259

241260
public String getStaticOrDefaultConfig(

0 commit comments

Comments
 (0)