Skip to content

Commit 78a66fa

Browse files
authored
KAFKA-19648; Cluster metadata bootstrapping with kraft checkpoint (#20707)
Previously, bootstrap metadata was stored in a separate bootstrap.checkpoint file, while the zero checkpoint contained only KRaft control records. This change unifies them by having the Formatter append bootstrap metadata records into the zero checkpoint alongside the existing KRaft control records, integrating with KRaft's bootstrapping checkpoint mechanisms like RaftClient.Listener#handleLoadBootstrap and KIP-630 snapshot lifecycle management. QuorumController's handleLoadBootstrap now extracts bootstrap records from the zero checkpoint and stores them as BootstrapMetadata, which is later committed by ActivationRecordsGenerator when the controller activates on an empty metadata log. The BootstrapDirectory class is removed and its functionality consolidated into static methods on BootstrapMetadata#fromDirectory reads from the legacy bootstrap.checkpoint (falling back to defaults), and fromCheckpointFile reads from a specific checkpoint path. StorageTool now only writes the bootstrap snapshot when the node has the Controller role. KafkaClusterTestKit is updated to pass non-feature versions, non-SCRAM bootstrap records to the Formatter as additional bootstrap records. Reviewers: José Armando García Sancio <jsancio@apache.org>, Kevin Wu <kevin.wu2412@gmail.com>
1 parent c0041e9 commit 78a66fa

18 files changed

Lines changed: 395 additions & 282 deletions

File tree

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1432,6 +1432,7 @@ project(':metadata') {
14321432

14331433
testImplementation testLog4j2Libs
14341434
testFixturesImplementation testFixtures(project(':server-common'))
1435+
testFixturesImplementation project(':raft')
14351436
testFixturesImplementation libs.junitJupiter
14361437

14371438
testImplementation libs.junitJupiter

core/src/main/scala/kafka/server/KafkaRaftServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package kafka.server
1818

1919
import java.io.File
20+
import java.nio.file.Path
2021
import java.util.concurrent.CompletableFuture
2122
import kafka.utils.{Logging, Mx4jLoader}
2223
import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
@@ -25,7 +26,7 @@ import org.apache.kafka.common.utils.{Time, Utils}
2526
import org.apache.kafka.common.utils.internals.AppInfoParser
2627
import org.apache.kafka.common.{KafkaException, Uuid}
2728
import org.apache.kafka.metadata.KafkaConfigSchema
28-
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
29+
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
2930
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
3031
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
3132
import org.apache.kafka.raft.QuorumConfig
@@ -182,8 +183,7 @@ object KafkaRaftServer {
182183
}
183184

184185
// Load the BootstrapMetadata.
185-
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir)
186-
val bootstrapMetadata = bootstrapDirectory.read()
186+
val bootstrapMetadata = BootstrapMetadata.fromDirectory(Path.of(config.metadataLogDir))
187187
(metaPropsEnsemble, bootstrapMetadata)
188188
}
189189

core/src/main/scala/kafka/tools/StorageTool.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ object StorageTool extends Logging {
125125
val formatter = new Formatter().
126126
setPrintStream(printStream).
127127
setNodeId(config.nodeId).
128+
setWriteBootstrapSnapshot(config.processRoles.contains(ProcessRole.ControllerRole)).
128129
setClusterId(namespace.getString("cluster_id")).
129130
setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled).
130131
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).

core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,13 @@ import java.nio.file.Files
2121
import java.util.{Optional, Properties}
2222
import org.apache.kafka.common.{KafkaException, Uuid}
2323
import org.apache.kafka.common.utils.Utils
24-
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
24+
import org.apache.kafka.metadata.bootstrap.{BootstrapMetadata, BootstrapTestUtils}
2525
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
26-
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
26+
import org.apache.kafka.metadata.storage.Formatter
2727
import org.apache.kafka.network.SocketServerConfigs
28-
import org.apache.kafka.server.config.ServerLogConfigs
28+
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
2929
import org.apache.kafka.server.common.MetadataVersion
30+
import org.apache.kafka.server.config.ServerLogConfigs
3031
import org.apache.kafka.storage.internals.log.UnifiedLog
3132
import org.apache.kafka.test.TestUtils
3233
import org.junit.jupiter.api.Assertions._
@@ -109,8 +110,13 @@ class KafkaRaftServerTest {
109110
}
110111

111112
private def writeBootstrapMetadata(logDir: File, metadataVersion: MetadataVersion): Unit = {
112-
val bootstrapDirectory = new BootstrapDirectory(logDir.toString)
113-
bootstrapDirectory.writeBinaryFile(BootstrapMetadata.fromVersion(metadataVersion, "test"))
113+
Formatter.writeBoostrapSnapshot(
114+
logDir.toString,
115+
BootstrapMetadata.fromVersion(metadataVersion, "test"),
116+
Optional.empty(),
117+
0.toShort,
118+
"CONTROLLER"
119+
)
114120
}
115121

116122
@Test
@@ -272,20 +278,32 @@ class KafkaRaftServerTest {
272278
setDirectoryId(Uuid.fromString("4jm0e-YRYeB6CCKBvwoS8w")).
273279
build()
274280

275-
val configProperties = new Properties
276-
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
277-
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
278-
configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
279-
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093")
280-
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
281+
val logDir = TestUtils.tempDirectory()
282+
try {
283+
writeMetaProperties(logDir, metaProperties)
284+
writeBootstrapMetadata(logDir, MetadataVersion.IBP_3_3_IV3)
285+
286+
val configProperties = new Properties
287+
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
288+
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
289+
configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
290+
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093")
291+
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
292+
configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, logDir.getAbsolutePath)
293+
val config = KafkaConfig.fromProps(configProperties)
281294

282-
val (metaPropertiesEnsemble, bootstrapMetadata) =
283-
invokeLoadMetaProperties(metaProperties, configProperties, Some(MetadataVersion.IBP_3_3_IV3))
295+
val (metaPropertiesEnsemble, _) =
296+
KafkaRaftServer.initializeLogDirs(config, MetaPropertiesEnsemble.LOG, "")
284297

285-
assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next())
286-
assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
287-
assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty)
288-
assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.IBP_3_3_IV3)
298+
assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next())
299+
assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
300+
assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty)
301+
302+
val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(logDir.getAbsolutePath)
303+
assertEquals(MetadataVersion.IBP_3_3_IV3, bootstrapMetadata.metadataVersion())
304+
} finally {
305+
Utils.delete(logDir)
306+
}
289307
}
290308

291309
@Test

core/src/test/scala/unit/kafka/tools/StorageToolTest.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,12 @@ import kafka.utils.TestUtils
2727
import net.sourceforge.argparse4j.inf.ArgumentParserException
2828
import org.apache.kafka.common.metadata.UserScramCredentialRecord
2929
import org.apache.kafka.common.utils.Utils
30-
import org.apache.kafka.server.common.{Feature, MetadataVersion}
31-
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
30+
import org.apache.kafka.metadata.bootstrap.BootstrapTestUtils
3231
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, PropertiesUtils}
3332
import org.apache.kafka.metadata.storage.FormatterException
3433
import org.apache.kafka.network.SocketServerConfigs
3534
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
35+
import org.apache.kafka.server.common.{Feature, MetadataVersion}
3636
import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
3737
import org.apache.kafka.server.util.TerseFailure
3838
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
@@ -390,15 +390,15 @@ Found problem:
390390
def testFormatWithReleaseVersionAndFeatureOverride(): Unit = {
391391
val availableDirs = Seq(TestUtils.tempDir())
392392
val properties = new Properties()
393-
properties.putAll(defaultStaticQuorumProperties)
393+
properties.putAll(defaultDynamicQuorumProperties)
394394
properties.setProperty("log.dirs", availableDirs.mkString(","))
395395
val stream = new ByteArrayOutputStream()
396396
assertEquals(0, runFormatCommand(stream, properties, Seq(
397-
"--release-version", "3.7-IV0",
397+
"--release-version", "3.7-IV0", "--standalone",
398398
"--feature", "share.version=1")))
399399

400400
// Verify that the feature override is applied by checking the bootstrap metadata
401-
val bootstrapMetadata = new BootstrapDirectory(availableDirs.head.toString).read
401+
val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
402402

403403
// Verify that the share.version feature is set to 1 as specified
404404
assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"),
@@ -409,25 +409,25 @@ Found problem:
409409
"Failed to find release version in output: " + stream.toString())
410410

411411
// Verify that the format command completed successfully with features
412-
assertTrue(stream.toString().contains("Formatting metadata directory"),
412+
assertTrue(stream.toString().contains("Formatting dynamic metadata voter directory"),
413413
"Failed to find formatting message in output: " + stream.toString())
414414
}
415415

416416
@Test
417417
def testFormatWithMultipleFeatures(): Unit = {
418418
val availableDirs = Seq(TestUtils.tempDir())
419419
val properties = new Properties()
420-
properties.putAll(defaultStaticQuorumProperties)
420+
properties.putAll(defaultDynamicQuorumProperties)
421421
properties.setProperty("log.dirs", availableDirs.mkString(","))
422422
val stream = new ByteArrayOutputStream()
423423
assertEquals(0, runFormatCommand(stream, properties, Seq(
424-
"--release-version", "3.8-IV0",
424+
"--release-version", "3.8-IV0", "--standalone",
425425
"--feature", "share.version=1",
426426
"--feature", "transaction.version=2",
427427
"--feature", "group.version=1")))
428428

429429
// Verify that all features are properly bootstrapped by checking the bootstrap metadata
430-
val bootstrapMetadata = new BootstrapDirectory(availableDirs.head.toString).read
430+
val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
431431

432432
// Verify that all specified features are set correctly
433433
assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"),
@@ -442,7 +442,7 @@ Found problem:
442442
"Failed to find release version in output: " + stream.toString())
443443

444444
// Verify that the format command completed successfully with multiple features
445-
assertTrue(stream.toString().contains("Formatting metadata directory"),
445+
assertTrue(stream.toString().contains("Formatting dynamic metadata voter directory"),
446446
"Failed to find formatting message in output: " + stream.toString())
447447
}
448448

@@ -852,11 +852,11 @@ Found problem:
852852
def testBootstrapScramRecords(): Unit = {
853853
val availableDirs = Seq(TestUtils.tempDir())
854854
val properties = new Properties()
855-
properties.putAll(defaultStaticQuorumProperties)
855+
properties.putAll(defaultDynamicQuorumProperties)
856856
properties.setProperty("log.dirs", availableDirs.mkString(","))
857857
val stream = new ByteArrayOutputStream()
858858
val arguments = ListBuffer[String](
859-
"--release-version", "3.9-IV0",
859+
"--release-version", "3.9-IV0", "--standalone",
860860
"--add-scram", "SCRAM-SHA-512=[name=alice,password=changeit]",
861861
"--add-scram", "SCRAM-SHA-512=[name=bob,password=changeit]"
862862
)
@@ -865,7 +865,7 @@ Found problem:
865865

866866
// Not doing full SCRAM record validation since that's covered elsewhere.
867867
// Just checking that we generate the correct number of records
868-
val bootstrapMetadata = new BootstrapDirectory(availableDirs.head.toString).read
868+
val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
869869
val scramRecords = bootstrapMetadata.records().asScala
870870
.filter(apiMessageAndVersion => apiMessageAndVersion.message().isInstanceOf[UserScramCredentialRecord])
871871
.map(apiMessageAndVersion => apiMessageAndVersion.message().asInstanceOf[UserScramCredentialRecord])

metadata/src/main/java/org/apache/kafka/controller/QuorumController.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@
128128

129129
import org.slf4j.Logger;
130130

131+
import java.util.ArrayList;
131132
import java.util.Collection;
132133
import java.util.EnumSet;
133134
import java.util.HashMap;
@@ -1075,7 +1076,31 @@ public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
10751076

10761077
@Override
10771078
public void handleLoadBootstrap(SnapshotReader<ApiMessageAndVersion> reader) {
1078-
reader.close();
1079+
appendRaftEvent(String.format("handleLoadBootstrap[snapshotId=%s]", reader.snapshotId()), () -> {
1080+
try {
1081+
String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
1082+
if (isActiveController()) {
1083+
throw fatalFaultHandler.handleFault("Asked to load bootstrap snapshot " + snapshotName +
1084+
", but we are the active controller at epoch " + curClaimEpoch);
1085+
}
1086+
List<ApiMessageAndVersion> records = new ArrayList<>();
1087+
while (reader.hasNext()) {
1088+
Batch<ApiMessageAndVersion> batch = reader.next();
1089+
records.addAll(batch.records());
1090+
}
1091+
if (!records.isEmpty()) {
1092+
log.debug("Loaded {} bootstrap records from {}", records.size(), snapshotName);
1093+
bootstrapMetadata = BootstrapMetadata.fromRecords(records, "bootstrap");
1094+
}
1095+
} catch (FaultHandlerException e) {
1096+
throw e;
1097+
} catch (Throwable e) {
1098+
throw fatalFaultHandler.handleFault("Error while loading bootstrap snapshot " +
1099+
reader.snapshotId(), e);
1100+
} finally {
1101+
reader.close();
1102+
}
1103+
});
10791104
}
10801105

10811106
@Override
@@ -1460,7 +1485,7 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon
14601485
/**
14611486
* The bootstrap metadata to use for initialization if needed.
14621487
*/
1463-
private final BootstrapMetadata bootstrapMetadata;
1488+
private BootstrapMetadata bootstrapMetadata;
14641489

14651490
/**
14661491
* The maximum number of records per batch to allow.

metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java

Lines changed: 0 additions & 117 deletions
This file was deleted.

0 commit comments

Comments
 (0)