-
Notifications
You must be signed in to change notification settings - Fork 15.2k
KAFKA-19648; Cluster metadata bootstrapping with kraft checkpoint #20707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 30 commits
a9c79a3
2a6f663
752a79e
026da2b
7e303b9
584fe3d
9852262
9cb5318
ff3c050
f5763d0
f3a3e41
0ab8e02
1f76fa9
4de55ce
56435b3
29affeb
533033b
f35e992
1c4528e
625acfb
cb2ad18
3c809c1
1908b2c
7226af0
029aa6b
6dbef44
cc3f0b4
7ad1342
135c126
383045a
b474770
721fb5c
cb4803d
6a46cf7
adb1548
a839631
96bc7f3
4823d30
082d469
dae3bd9
8ebfdaf
5ba0c9b
a7450e0
dcd1b0b
67ad6c4
0a78d0d
de4da73
eb4ba90
1d1e7ba
dc27d7c
8469809
2e0c8f0
a182b12
0da0919
fd3adf6
3d589e6
56dae1b
b3feaa6
03eafae
7e8d2e6
7a9404f
9f82078
ff60272
e05dcf2
f0cbc8b
031c0fc
f0b35af
56422a5
3614162
5698e93
d4e3b84
9380605
0e3c626
df3108b
f9ad128
76146ad
2ad3c53
473ac38
da05d3f
7d078b8
b27e7fc
b82a873
711a347
02c9b79
de8bb36
da46fb7
4993aaa
a7edcd0
6601e69
3b4aeb6
7896f12
207ca7c
9ee9f3d
9b668cb
645a08a
8d44f43
3a57241
4c7d5f6
ce5faf6
b83d676
f8df406
d1a8ea7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -120,9 +120,13 @@ object StorageTool extends Logging { | |||||
| throw new TerseFailure("The kafka configuration file appears to be for " + | ||||||
| "a legacy cluster. Formatting is only supported for clusters in KRaft mode.") | ||||||
| } | ||||||
| val writeBootstrapSnapshot = config.processRoles.contains(ProcessRole.ControllerRole) || | ||||||
| !config.processRoles.contains(ProcessRole.BrokerRole) | ||||||
|
|
||||||
| val formatter = new Formatter(). | ||||||
| setPrintStream(printStream). | ||||||
| setNodeId(config.nodeId). | ||||||
| setWriteBootstrapSnapshot(writeBootstrapSnapshot). | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| setClusterId(namespace.getString("cluster_id")). | ||||||
| setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled). | ||||||
| setIgnoreFormatted(namespace.getBoolean("ignore_formatted")). | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1405,7 +1405,12 @@ class KRaftClusterTest { | |
| build() | ||
| try { | ||
| cluster.format() | ||
| val controllerServer = cluster.controllers().values().iterator().next() | ||
| val dynamicConfigProps = new Properties() | ||
| dynamicConfigProps.setProperty(ServerConfigs.NUM_IO_THREADS_CONFIG, "9") | ||
| controllerServer.config.dynamicConfig.initialize(None) | ||
| cluster.startup() | ||
| controllerServer.config.dynamicConfig.updateDefaultConfig(dynamicConfigProps) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please remove these changes. There are several other unrelated tests where you changed the test code to pass. After your changes to |
||
| val controller = cluster.controllers().values().iterator().next() | ||
| TestUtils.retry(60000) { | ||
| assertNotNull(controller.controllerApisHandlerPool) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,7 +30,7 @@ import org.apache.kafka.common.test.ClusterInstance | |
| import org.apache.kafka.common.utils.Utils | ||
| import org.apache.kafka.coordinator.group.GroupCoordinatorConfig | ||
| import org.apache.kafka.security.authorizer.AclEntry | ||
| import org.apache.kafka.server.common.Feature | ||
| import org.apache.kafka.server.common.{Feature, MetadataVersion} | ||
| import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse} | ||
|
|
||
| import java.lang.{Byte => JByte} | ||
|
|
@@ -48,8 +48,15 @@ import scala.jdk.CollectionConverters._ | |
| class ConsumerGroupDescribeRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) { | ||
|
|
||
| @ClusterTest( | ||
| metadataVersion = MetadataVersion.IBP_4_1_IV1, | ||
| features = Array( | ||
| new ClusterFeature(feature = Feature.GROUP_VERSION, version = 0) | ||
| ), | ||
| serverProperties = Array( | ||
| new ClusterConfigProperty( | ||
| key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, | ||
| value = "classic" | ||
| ) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my other comment regarding test code changes for unrelated tests. |
||
| ) | ||
| ) | ||
| def testConsumerGroupDescribeWhenFeatureFlagNotEnabled(): Unit = { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| #!/bin/bash | ||
| # Licensed to the Apache Software Foundation (ASF) under one or more | ||
| # contributor license agreements. See the NOTICE file distributed with | ||
| # this work for additional information regarding copyright ownership. | ||
| # The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| # (the "License"); you may not use this file except in compliance with | ||
| # the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, software | ||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| # Gradle build script for Cursor IDE | ||
|
|
||
| # Source the shell environment to get jenv | ||
| source ~/.zshrc | ||
|
|
||
| # Change to the kafka directory | ||
| cd "$(dirname "$0")" | ||
|
|
||
| # Run gradle with the provided arguments | ||
| ./gradlew "$@" | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this since we only read it once. Also we can remove
!config.processRoles.contains(ProcessRole.BrokerRole)from the predicate.