Skip to content
Merged
Show file tree
Hide file tree
Changes from 90 commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
a9c79a3
redo implementation to use RecordsSnapshotWriter
mannoopj Oct 20, 2025
2a6f663
NIT fix spacing
mannoopj Oct 20, 2025
752a79e
NIT remove unused import
mannoopj Oct 20, 2025
026da2b
NIT remove extra line
mannoopj Oct 20, 2025
7e303b9
add bootstrap records in formatter
mannoopj Oct 20, 2025
584fe3d
revert RaftClientTestContext
mannoopj Oct 20, 2025
9852262
wip changes
mannoopj Nov 3, 2025
9cb5318
wip remove system.out
mannoopj Nov 3, 2025
ff3c050
wip remove more system.out
mannoopj Nov 3, 2025
f5763d0
remove system out in quorum controller
mannoopj Nov 3, 2025
f3a3e41
change check for emptyLog
mannoopj Nov 4, 2025
0ab8e02
remove prints
mannoopj Nov 4, 2025
1f76fa9
remove default
mannoopj Nov 4, 2025
4de55ce
remove bootstrapRecordsAppended
mannoopj Nov 5, 2025
56435b3
dont format checkpoint for broker only, split bootstrapDirectory read
mannoopj Nov 5, 2025
29affeb
WIP, stop MetadataLoader reading 000.checkpoint
mannoopj Nov 7, 2025
533033b
remove null check for bootstrapMetadata
mannoopj Nov 7, 2025
f35e992
stop replaying metadata records
mannoopj Nov 7, 2025
1c4528e
address comments
mannoopj Nov 18, 2025
625acfb
handleLoadSnapshot changes
mannoopj Nov 18, 2025
cb2ad18
import fixes
mannoopj Nov 18, 2025
3c809c1
temp build changes
mannoopj Nov 19, 2025
1908b2c
BootstrapDirectoryTest fix
mannoopj Nov 19, 2025
7226af0
more tests fixes
mannoopj Nov 19, 2025
029aa6b
Revert "BootstrapDirectoryTest fix"
mannoopj Nov 19, 2025
6dbef44
BootstrapDirectoryTest fix
mannoopj Nov 19, 2025
cc3f0b4
nit: fix indentation
mannoopj Nov 19, 2025
7ad1342
ReconfigurableQuorumIntegrationTest fix
mannoopj Nov 20, 2025
135c126
StorageToolTest fix
mannoopj Nov 20, 2025
383045a
fix KafkaClusterTestKit
mannoopj Nov 24, 2025
b474770
remove writeBootstrapSnapshot var in StorageTool
mannoopj Nov 25, 2025
721fb5c
remove test specific fixes
mannoopj Nov 25, 2025
cb4803d
removed unused import
mannoopj Nov 25, 2025
6a46cf7
testStartupWithNonDefaultKControllerDynamicConfiguration fix
mannoopj Nov 25, 2025
adb1548
add WARN message
mannoopj Dec 1, 2025
a839631
add FileNotFoundException
mannoopj Dec 1, 2025
96bc7f3
add checkpoint file exist checks
mannoopj Dec 1, 2025
4823d30
remove assertCheckpointExists
mannoopj Dec 1, 2025
082d469
move up copier.setWriteErrorHandler
mannoopj Dec 1, 2025
dae3bd9
move testNonDefaultKControllerDynamicConfiguration to DynamicBrokerRe…
mannoopj Dec 1, 2025
8ebfdaf
remove additionalBootstrapRecords
mannoopj Dec 1, 2025
5ba0c9b
convert BootstrapDirectory to interface
mannoopj Dec 1, 2025
a7450e0
fix testFormatterFailsOnUnwritableDirectory
mannoopj Dec 1, 2025
dcd1b0b
move up readFromBinaryFile
mannoopj Dec 2, 2025
67ad6c4
remove writeBinaryFile
mannoopj Dec 2, 2025
0a78d0d
revert gradle changes
mannoopj Dec 2, 2025
de4da73
revert build.gradle
mannoopj Dec 2, 2025
eb4ba90
more build.gradle reverts
mannoopj Dec 2, 2025
1d1e7ba
more build.gradle fixes
mannoopj Dec 2, 2025
dc27d7c
import-control.xml reverts
mannoopj Dec 2, 2025
8469809
KafkaStreamsTelemetryIntegrationTest reverts
mannoopj Dec 2, 2025
2e0c8f0
KafkaStreamsTelemetryIntegrationTest more reverts
mannoopj Dec 2, 2025
a182b12
fix spacing
mannoopj Dec 2, 2025
0da0919
FormatterTest fixes
mannoopj Dec 2, 2025
fd3adf6
move TestBootstrapDirectory
mannoopj Dec 2, 2025
3d589e6
remove extra line
mannoopj Dec 2, 2025
56dae1b
remove SuppressWarnings for testReadFromEmptyConfiguration
mannoopj Dec 2, 2025
b3feaa6
move BootstrapDirectory filenames
mannoopj Dec 4, 2025
03eafae
nit BootstrapDirectory fixes
mannoopj Dec 4, 2025
7e8d2e6
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj Dec 4, 2025
7a9404f
fix incorrect merge
mannoopj Dec 4, 2025
9f82078
remove generic exceptions
mannoopj Jan 22, 2026
ff60272
change BootstrapMetadata readFromBinaryFile to static
mannoopj Jan 22, 2026
e05dcf2
make BootstrapMetadata readFromConfiguration() private
mannoopj Jan 22, 2026
f0cbc8b
add documentation for BootstrapDirectory constructors
mannoopj Jan 22, 2026
031c0fc
add IllegalStateException
mannoopj Jan 22, 2026
f0b35af
print the entire path for FormatterException
mannoopj Jan 22, 2026
56422a5
add isCommittedSnapshot property to snapshot reader
mannoopj Jan 26, 2026
3614162
stop leaking implementation details to controller
mannoopj Jan 26, 2026
5698e93
volatile BootstrapMetadata
mannoopj Jan 26, 2026
d4e3b84
more exception handling
mannoopj Jan 26, 2026
9380605
move readFromBinaryFile to BootstrapFileUtils
mannoopj Feb 2, 2026
0e3c626
move everything back into BootstrapDirectory
mannoopj Feb 2, 2026
df3108b
add handleLoadBootstrap
mannoopj Feb 2, 2026
f9ad128
add documentation QuorumController:bootstrapMetadata being marked as …
mannoopj Feb 2, 2026
76146ad
parametrized file name
mannoopj Feb 2, 2026
2ad3c53
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj Feb 2, 2026
473ac38
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj Mar 19, 2026
da05d3f
remove NonDefaultKControllerDynamicConfiguration tests
mannoopj Mar 19, 2026
7d078b8
fix imports
mannoopj Mar 19, 2026
b27e7fc
remove isCommittedSnapshot
mannoopj Mar 19, 2026
b82a873
git add BootstrapTestUtils.java
mannoopj Mar 19, 2026
711a347
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj Mar 19, 2026
02c9b79
delete core/src/test/scala/integration/kafka/server/KRaftClusterTest.…
mannoopj Mar 19, 2026
de8bb36
remove redudnant code
mannoopj Mar 19, 2026
da46fb7
FormatterTest fixes
mannoopj Mar 19, 2026
4993aaa
handleLoadBootstrap implementation for QuorumController
mannoopj Mar 19, 2026
a7edcd0
revert empty line deletion
mannoopj Mar 19, 2026
6601e69
cleanup fixes
mannoopj Mar 25, 2026
3b4aeb6
fix testStartupWithNonDefaultKControllerDynamicConfiguration
mannoopj Mar 25, 2026
7896f12
more clean up fixes
mannoopj Mar 27, 2026
207ca7c
add reads from zero checkpoint in fromDirectory
mannoopj Mar 27, 2026
9ee9f3d
bootstrapMetadata and bootstrapMetadataTest fixes
mannoopj Mar 27, 2026
9b668cb
BootstrapMetadata fixes
mannoopj Apr 27, 2026
645a08a
synchronized, IllegalStateException, and nit checkstyle fixes
mannoopj May 14, 2026
8d44f43
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj May 14, 2026
3a57241
Move BootstrapTestUtils to testFixtures for cross-module visibility
mannoopj May 14, 2026
4c7d5f6
add back BootstrapTestUtils.java
mannoopj May 14, 2026
ce5faf6
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
jsancio May 18, 2026
b83d676
rework testKRaftUpdateAt3_3_IV3
mannoopj May 18, 2026
f8df406
Merge remote-tracking branch 'origin/kip-1170-format' into kip-1170-f…
mannoopj May 19, 2026
d1a8ea7
add disabledFeatures for intergration tests
mannoopj May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.{AppInfoParser, Time, Utils}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
import org.apache.kafka.raft.QuorumConfig
Expand Down Expand Up @@ -181,8 +181,7 @@ object KafkaRaftServer {
}

// Load the BootstrapMetadata.
val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir)
val bootstrapMetadata = bootstrapDirectory.read()
val bootstrapMetadata = BootstrapMetadata.fromDirectory(java.nio.file.Path.of(config.metadataLogDir))
Comment thread
jsancio marked this conversation as resolved.
Outdated
(metaPropsEnsemble, bootstrapMetadata)
}

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ object StorageTool extends Logging {
val formatter = new Formatter().
setPrintStream(printStream).
setNodeId(config.nodeId).
setWriteBootstrapSnapshot(config.processRoles.contains(ProcessRole.ControllerRole)).
Comment thread
jsancio marked this conversation as resolved.
setClusterId(namespace.getString("cluster_id")).
setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled).
setIgnoreFormatted(namespace.getBoolean("ignore_formatted")).
Expand Down
52 changes: 35 additions & 17 deletions core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import java.nio.file.Files
import java.util.{Optional, Properties}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.metadata.bootstrap.{BootstrapMetadata, BootstrapTestUtils}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.ServerLogConfigs
import org.apache.kafka.storage.internals.log.UnifiedLog
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
Expand Down Expand Up @@ -109,8 +110,13 @@ class KafkaRaftServerTest {
}

private def writeBootstrapMetadata(logDir: File, metadataVersion: MetadataVersion): Unit = {
val bootstrapDirectory = new BootstrapDirectory(logDir.toString)
bootstrapDirectory.writeBinaryFile(BootstrapMetadata.fromVersion(metadataVersion, "test"))
Formatter.writeBoostrapSnapshot(
logDir.toString,
BootstrapMetadata.fromVersion(metadataVersion, "test"),
Optional.empty(),
0.toShort,
"CONTROLLER"
)
}

@Test
Expand Down Expand Up @@ -272,20 +278,32 @@ class KafkaRaftServerTest {
setDirectoryId(Uuid.fromString("4jm0e-YRYeB6CCKBvwoS8w")).
build()

val configProperties = new Properties
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093")
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
val logDir = TestUtils.tempDirectory()
try {
writeMetaProperties(logDir, metaProperties)
writeBootstrapMetadata(logDir, MetadataVersion.IBP_3_3_IV3)

val configProperties = new Properties
configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString)
configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093")
configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093")
configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, logDir.getAbsolutePath)
val config = KafkaConfig.fromProps(configProperties)

val (metaPropertiesEnsemble, bootstrapMetadata) =
invokeLoadMetaProperties(metaProperties, configProperties, Some(MetadataVersion.IBP_3_3_IV3))
val (metaPropertiesEnsemble, _) =
KafkaRaftServer.initializeLogDirs(config, MetaPropertiesEnsemble.LOG, "")
Comment thread
jsancio marked this conversation as resolved.

assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next())
assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty)
assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.IBP_3_3_IV3)
assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next())
assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty)
assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty)

val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(logDir.getAbsolutePath)
assertEquals(MetadataVersion.IBP_3_3_IV3, bootstrapMetadata.metadataVersion())
} finally {
Utils.delete(logDir)
}
}

@Test
Expand Down
26 changes: 13 additions & 13 deletions core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import kafka.utils.TestUtils
import net.sourceforge.argparse4j.inf.ArgumentParserException
import org.apache.kafka.common.metadata.UserScramCredentialRecord
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.apache.kafka.metadata.bootstrap.BootstrapDirectory
import org.apache.kafka.metadata.bootstrap.BootstrapTestUtils
import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, PropertiesUtils}
import org.apache.kafka.metadata.storage.FormatterException
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.common.{Feature, MetadataVersion}
import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
import org.junit.jupiter.api.{Test, Timeout}
Expand Down Expand Up @@ -389,15 +389,15 @@ Found problem:
def testFormatWithReleaseVersionAndFeatureOverride(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties, Seq(
"--release-version", "3.7-IV0",
"--release-version", "3.7-IV0", "--standalone",
"--feature", "share.version=1")))

// Verify that the feature override is applied by checking the bootstrap metadata
val bootstrapMetadata = new BootstrapDirectory(availableDirs.head.toString).read
val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)

// Verify that the share.version feature is set to 1 as specified
assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"),
Expand All @@ -408,25 +408,25 @@ Found problem:
"Failed to find release version in output: " + stream.toString())

// Verify that the format command completed successfully with features
assertTrue(stream.toString().contains("Formatting metadata directory"),
assertTrue(stream.toString().contains("Formatting dynamic metadata voter directory"),
"Failed to find formatting message in output: " + stream.toString())
}

@Test
def testFormatWithMultipleFeatures(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
assertEquals(0, runFormatCommand(stream, properties, Seq(
"--release-version", "3.8-IV0",
"--release-version", "3.8-IV0", "--standalone",
"--feature", "share.version=1",
"--feature", "transaction.version=2",
"--feature", "group.version=1")))

// Verify that all features are properly bootstrapped by checking the bootstrap metadata
val bootstrapMetadata = new BootstrapDirectory(availableDirs.head.toString).read
val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)

// Verify that all specified features are set correctly
assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"),
Expand All @@ -441,7 +441,7 @@ Found problem:
"Failed to find release version in output: " + stream.toString())

// Verify that the format command completed successfully with multiple features
assertTrue(stream.toString().contains("Formatting metadata directory"),
assertTrue(stream.toString().contains("Formatting dynamic metadata voter directory"),
"Failed to find formatting message in output: " + stream.toString())
}

Expand Down Expand Up @@ -851,11 +851,11 @@ Found problem:
def testBootstrapScramRecords(): Unit = {
val availableDirs = Seq(TestUtils.tempDir())
val properties = new Properties()
properties.putAll(defaultStaticQuorumProperties)
properties.putAll(defaultDynamicQuorumProperties)
properties.setProperty("log.dirs", availableDirs.mkString(","))
val stream = new ByteArrayOutputStream()
val arguments = ListBuffer[String](
"--release-version", "3.9-IV0",
"--release-version", "3.9-IV0", "--standalone",
"--add-scram", "SCRAM-SHA-512=[name=alice,password=changeit]",
"--add-scram", "SCRAM-SHA-512=[name=bob,password=changeit]"
)
Expand All @@ -864,7 +864,7 @@ Found problem:

// Not doing full SCRAM record validation since that's covered elsewhere.
// Just checking that we generate the correct number of records
val bootstrapMetadata = new BootstrapDirectory(availableDirs.head.toString).read
val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString)
val scramRecords = bootstrapMetadata.records().asScala
.filter(apiMessageAndVersion => apiMessageAndVersion.message().isInstanceOf[UserScramCredentialRecord])
.map(apiMessageAndVersion => apiMessageAndVersion.message().asInstanceOf[UserScramCredentialRecord])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@

import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
Expand Down Expand Up @@ -1067,7 +1068,31 @@ public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {

@Override
public void handleLoadBootstrap(SnapshotReader<ApiMessageAndVersion> reader) {
reader.close();
appendRaftEvent(String.format("handleLoadBootstrap[snapshotId=%s]", reader.snapshotId()), () -> {
try {
String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId());
if (isActiveController()) {
throw fatalFaultHandler.handleFault("Asked to load bootstrap snapshot " + snapshotName +
", but we are the active controller at epoch " + curClaimEpoch);
}
List<ApiMessageAndVersion> records = new ArrayList<>();
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
records.addAll(batch.records());
}
if (!records.isEmpty()) {
log.debug("Loaded {} bootstrap records from {}", records.size(), snapshotName);
bootstrapMetadata = BootstrapMetadata.fromRecords(records, "bootstrap");
}
} catch (FaultHandlerException e) {
throw e;
} catch (Throwable e) {
throw fatalFaultHandler.handleFault("Error while loading bootstrap snapshot " +
reader.snapshotId(), e);
} finally {
reader.close();
}
});
}

@Override
Expand Down Expand Up @@ -1450,9 +1475,11 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon
private volatile int curClaimEpoch;

/**
* The bootstrap metadata to use for initialization if needed.
* The bootstrap mtryinetadata to use for initialization if needed.
* Written by handleLoadBootstrap and read during CompleteActivationEvent,
* both on the controller event thread.
Comment thread
jsancio marked this conversation as resolved.
Outdated
*/
private final BootstrapMetadata bootstrapMetadata;
private BootstrapMetadata bootstrapMetadata;
Comment thread
jsancio marked this conversation as resolved.

/**
* The maximum number of records per batch to allow.
Expand Down

This file was deleted.

Loading