Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
a9c79a3
redo implementation to use RecordsSnapshotWriter
mannoopj Oct 20, 2025
2a6f663
NIT fix spacing
mannoopj Oct 20, 2025
752a79e
NIT remove unused import
mannoopj Oct 20, 2025
026da2b
NIT remove extra line
mannoopj Oct 20, 2025
7e303b9
add bootstrap records in formatter
mannoopj Oct 20, 2025
584fe3d
revert RaftClientTestContext
mannoopj Oct 20, 2025
9852262
wip changes
mannoopj Nov 3, 2025
9cb5318
wip remove system.out
mannoopj Nov 3, 2025
ff3c050
wip remove more system.out
mannoopj Nov 3, 2025
f5763d0
remove system out in quorum controller
mannoopj Nov 3, 2025
f3a3e41
change check for emptyLog
mannoopj Nov 4, 2025
0ab8e02
remove prints
mannoopj Nov 4, 2025
1f76fa9
remove default
mannoopj Nov 4, 2025
4de55ce
remove bootstrapRecordsAppended
mannoopj Nov 5, 2025
56435b3
dont format checkpoint for broker only, split bootstrapDirectory read
mannoopj Nov 5, 2025
29affeb
WIP, stop MetadataLoader reading 000.checkpoint
mannoopj Nov 7, 2025
533033b
remove null check for bootstrapMetadata
mannoopj Nov 7, 2025
f35e992
stop replaying metadata records
mannoopj Nov 7, 2025
1c4528e
address comments
mannoopj Nov 18, 2025
625acfb
handleLoadSnapshot changes
mannoopj Nov 18, 2025
cb2ad18
import fixes
mannoopj Nov 18, 2025
3c809c1
temp build changes
mannoopj Nov 19, 2025
1908b2c
BootstrapDirectoryTest fix
mannoopj Nov 19, 2025
7226af0
more tests fixes
mannoopj Nov 19, 2025
029aa6b
Revert "BootstrapDirectoryTest fix"
mannoopj Nov 19, 2025
6dbef44
BootstrapDirectoryTest fix
mannoopj Nov 19, 2025
cc3f0b4
nit: fix indentation
mannoopj Nov 19, 2025
7ad1342
ReconfigurableQuorumIntegrationTest fix
mannoopj Nov 20, 2025
135c126
StorageToolTest fix
mannoopj Nov 20, 2025
383045a
fix KafkaClusterTestKit
mannoopj Nov 24, 2025
b474770
remove writeBootstrapSnapshot var in StorageTool
mannoopj Nov 25, 2025
721fb5c
remove test specific fixes
mannoopj Nov 25, 2025
cb4803d
removed unused import
mannoopj Nov 25, 2025
6a46cf7
testStartupWithNonDefaultKControllerDynamicConfiguration fix
mannoopj Nov 25, 2025
adb1548
add WARN message
mannoopj Dec 1, 2025
a839631
add FileNotFoundException
mannoopj Dec 1, 2025
96bc7f3
add checkpoint file exist checks
mannoopj Dec 1, 2025
4823d30
remove assertCheckpointExists
mannoopj Dec 1, 2025
082d469
move up copier.setWriteErrorHandler
mannoopj Dec 1, 2025
dae3bd9
move testNonDefaultKControllerDynamicConfiguration to DynamicBrokerRe…
mannoopj Dec 1, 2025
8ebfdaf
remove additionalBootstrapRecords
mannoopj Dec 1, 2025
5ba0c9b
convert BootstrapDirectory to interface
mannoopj Dec 1, 2025
a7450e0
fix testFormatterFailsOnUnwritableDirectory
mannoopj Dec 1, 2025
dcd1b0b
move up readFromBinaryFile
mannoopj Dec 2, 2025
67ad6c4
remove writeBinaryFile
mannoopj Dec 2, 2025
0a78d0d
revert gradle changes
mannoopj Dec 2, 2025
de4da73
revert build.gradle
mannoopj Dec 2, 2025
eb4ba90
more build.gradle reverts
mannoopj Dec 2, 2025
1d1e7ba
more build.gradle fixes
mannoopj Dec 2, 2025
dc27d7c
import-control.xml reverts
mannoopj Dec 2, 2025
8469809
KafkaStreamsTelemetryIntegrationTest reverts
mannoopj Dec 2, 2025
2e0c8f0
KafkaStreamsTelemetryIntegrationTest more reverts
mannoopj Dec 2, 2025
a182b12
fix spacing
mannoopj Dec 2, 2025
0da0919
FormatterTest fixes
mannoopj Dec 2, 2025
fd3adf6
move TestBootstrapDirectory
mannoopj Dec 2, 2025
3d589e6
remove extra line
mannoopj Dec 2, 2025
56dae1b
remove SuppressWarnings for testReadFromEmptyConfiguration
mannoopj Dec 2, 2025
b3feaa6
move BootstrapDirectory filenames
mannoopj Dec 4, 2025
03eafae
nit BootstrapDirectory fixes
mannoopj Dec 4, 2025
7e8d2e6
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj Dec 4, 2025
7a9404f
fix incorrect merge
mannoopj Dec 4, 2025
9f82078
remove generic exceptions
mannoopj Jan 22, 2026
ff60272
change BootstrapMetadata readFromBinaryFile to static
mannoopj Jan 22, 2026
e05dcf2
make BootstrapMetadata readFromConfiguration() private
mannoopj Jan 22, 2026
f0cbc8b
add documentation for BootstrapDirectory constructors
mannoopj Jan 22, 2026
031c0fc
add IllegalStateException
mannoopj Jan 22, 2026
f0b35af
print the entire path for FormatterException
mannoopj Jan 22, 2026
56422a5
add isCommittedSnapshot property to snapshot reader
mannoopj Jan 26, 2026
3614162
stop leaking implementation details to controller
mannoopj Jan 26, 2026
5698e93
volatile BootstrapMetadata
mannoopj Jan 26, 2026
d4e3b84
more exception handling
mannoopj Jan 26, 2026
9380605
move readFromBinaryFile to BootstrapFileUtils
mannoopj Feb 2, 2026
0e3c626
move everything back into BootstrapDirectory
mannoopj Feb 2, 2026
df3108b
add handleLoadBootstrap
mannoopj Feb 2, 2026
f9ad128
add documentation QuorumController:bootstrapMetadata being marked as …
mannoopj Feb 2, 2026
76146ad
parametrized file name
mannoopj Feb 2, 2026
2ad3c53
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj Feb 2, 2026
473ac38
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj Mar 19, 2026
da05d3f
remove NonDefaultKControllerDynamicConfiguration tests
mannoopj Mar 19, 2026
7d078b8
fix imports
mannoopj Mar 19, 2026
b27e7fc
remove isCommittedSnapshot
mannoopj Mar 19, 2026
b82a873
git add BootstrapTestUtils.java
mannoopj Mar 19, 2026
711a347
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj Mar 19, 2026
02c9b79
delete core/src/test/scala/integration/kafka/server/KRaftClusterTest.…
mannoopj Mar 19, 2026
de8bb36
remove redudnant code
mannoopj Mar 19, 2026
da46fb7
FormatterTest fixes
mannoopj Mar 19, 2026
4993aaa
handleLoadBootstrap implementation for QuorumController
mannoopj Mar 19, 2026
a7edcd0
revert empty line deletion
mannoopj Mar 19, 2026
6601e69
cleanup fixes
mannoopj Mar 25, 2026
3b4aeb6
fix testStartupWithNonDefaultKControllerDynamicConfiguration
mannoopj Mar 25, 2026
7896f12
more clean up fixes
mannoopj Mar 27, 2026
207ca7c
add reads from zero checkpoint in fromDirectory
mannoopj Mar 27, 2026
9ee9f3d
bootstrapMetadata and bootstrapMetadataTest fixes
mannoopj Mar 27, 2026
9b668cb
BootstrapMetadata fixes
mannoopj Apr 27, 2026
645a08a
synchronized, IllegalStateException, and nit checkstyle fixes
mannoopj May 14, 2026
8d44f43
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
mannoopj May 14, 2026
3a57241
Move BootstrapTestUtils to testFixtures for cross-module visibility
mannoopj May 14, 2026
4c7d5f6
add back BootstrapTestUtils.java
mannoopj May 14, 2026
ce5faf6
Merge remote-tracking branch 'upstream/trunk' into kip-1170-format
jsancio May 18, 2026
b83d676
rework testKRaftUpdateAt3_3_IV3
mannoopj May 18, 2026
f8df406
Merge remote-tracking branch 'origin/kip-1170-format' into kip-1170-f…
mannoopj May 19, 2026
d1a8ea7
add disabledFeatures for intergration tests
mannoopj May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 37 additions & 57 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,22 @@ buildscript {
}

plugins {
id 'com.github.ben-manes.versions' version '0.53.0'
id 'com.github.ben-manes.versions' version '0.48.0'
id 'idea'
id 'jacoco'
id 'java-library'
id 'org.owasp.dependencycheck' version '12.1.3'
id 'org.owasp.dependencycheck' version '8.2.1'
id 'org.nosphere.apache.rat' version "0.8.1"
id "io.swagger.core.v3.swagger-gradle-plugin" version "${swaggerVersion}"

id "com.github.spotbugs" version '6.2.5' apply false
id "com.github.spotbugs" version '6.2.3' apply false
id 'org.scoverage' version '8.0.3' apply false
id 'com.gradleup.shadow' version '8.3.9' apply false
id 'com.diffplug.spotless' version "7.2.1"
id 'com.github.johnrengelman.shadow' version '8.0.0' apply false
id 'com.diffplug.spotless' version "6.25.0"
}

ext {
gradleVersion = versions.gradle
minClientJavaVersion = 11
minNonClientJavaVersion = 17
modulesNeedingJava11 = [":clients", ":generator", ":streams", ":streams:test-utils", ":streams:examples", ":streams-scala", ":test-common:test-common-util"]
Expand Down Expand Up @@ -71,13 +72,6 @@ ext {
"--add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED"
)

if (JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_25)) {
// Spotbugs is not compatible with Java 25+ so Gradle related tasks are disabled
// until version can be upgraded: https://github.com/spotbugs/spotbugs/issues/3564
project.gradle.startParameter.excludedTaskNames.add("spotbugsMain")
project.gradle.startParameter.excludedTaskNames.add("spotbugsTest")
}

maxTestForks = project.hasProperty('maxParallelForks') ? maxParallelForks.toInteger() : Runtime.runtime.availableProcessors()
maxScalacThreads = project.hasProperty('maxScalacThreads') ? maxScalacThreads.toInteger() :
Math.min(Runtime.runtime.availableProcessors(), 8)
Expand Down Expand Up @@ -303,7 +297,7 @@ if (repo != null) {
} else {
rat.enabled = false
}
println("Starting build with version $version (commit id ${commitId == null ? "null" : commitId.take(8)}) using Gradle $versions.gradle, Java ${JavaVersion.current()} and Scala ${versions.scala}")
println("Starting build with version $version (commit id ${commitId == null ? "null" : commitId.take(8)}) using Gradle $gradleVersion, Java ${JavaVersion.current()} and Scala ${versions.scala}")
println("Build properties: ignoreFailures=$userIgnoreFailures, maxParallelForks=$maxTestForks, maxScalacThreads=$maxScalacThreads, maxTestRetries=$userMaxTestRetries")

subprojects {
Expand All @@ -322,7 +316,19 @@ subprojects {
// We use the shadow plugin for the jmh-benchmarks module and the `-all` jar can get pretty large, so
// don't publish it
def shouldPublish = !project.name.equals('jmh-benchmarks')
def shouldPublishWithShadow = (['clients'].contains(project.name))
def shouldPublishWithShadow = false // Temporarily disabled due to ASM compatibility issue
// def shouldPublishWithShadow = (['clients'].contains(project.name))

if (shouldPublishWithShadow) {
apply plugin: 'com.github.johnrengelman.shadow'
}

// Fix for upgrade system tests that may not have actual test files
if (project.name.startsWith('upgrade-system-tests')) {
tasks.withType(Test) {
failOnNoDiscoveredTests = false
}
}

if (shouldPublish) {
apply plugin: 'maven-publish'
Expand All @@ -334,16 +340,6 @@ subprojects {
tasks.register('uploadArchives').configure { dependsOn(publish) }
}

tasks.withType(AbstractArchiveTask).configureEach {
reproducibleFileOrder = false
preserveFileTimestamps = true
useFileSystemPermissions()
}

tasks.withType(AbstractTestTask).configureEach {
failOnNoDiscoveredTests = false
}

// apply the eclipse plugin only to subprojects that hold code. 'connect' is just a folder.
if (!project.name.equals('connect')) {
apply plugin: 'eclipse'
Expand Down Expand Up @@ -379,8 +375,8 @@ subprojects {
if (!shouldPublishWithShadow) {
from components.java
} else {
apply plugin: 'com.gradleup.shadow'
from components.shadow
// Temporarily use java components instead of shadow due to ASM compatibility issue
from components.java

// Fix for avoiding inclusion of runtime dependencies marked as 'shadow' in MANIFEST Class-Path.
// https://github.com/GradleUp/shadow/issues/324
Expand Down Expand Up @@ -1055,8 +1051,6 @@ project(':core') {
implementation project(':transaction-coordinator')
implementation project(':metadata')
implementation project(':storage:storage-api')
// tools-api is automatically included in releaseTarGz via core's runtimeClasspath.
// If removed from here, remember to explicitly add it back in the releaseTarGz task.
implementation project(':tools:tools-api')
implementation project(':raft')
implementation project(':storage')
Expand Down Expand Up @@ -1279,6 +1273,8 @@ project(':core') {
from(project(':streams:test-utils').configurations.runtimeClasspath) { into("libs/") }
from(project(':streams:examples').jar) { into("libs/") }
from(project(':streams:examples').configurations.runtimeClasspath) { into("libs/") }
from(project(':tools:tools-api').jar) { into("libs/") }
from(project(':tools:tools-api').configurations.runtimeClasspath) { into("libs/") }
duplicatesStrategy 'exclude'
}

Expand Down Expand Up @@ -1911,41 +1907,22 @@ project(':clients') {
}
}

shadowJar {
dependsOn createVersionFile
// archiveClassifier defines the classifier for the shadow jar, the default is 'all'.
// We don't want to use the default classifier because it will cause the shadow jar to
// overwrite the original jar. We also don't want to use the 'shadow' classifier because
// it will cause the shadow jar to be named kafka-clients-shadow.jar. We want to use the
// same name as the original jar, kafka-clients.jar.
archiveClassifier = null
// KIP-714: move shaded dependencies to a shaded location
relocate('io.opentelemetry.proto', 'org.apache.kafka.shaded.io.opentelemetry.proto')
relocate('com.google.protobuf', 'org.apache.kafka.shaded.com.google.protobuf')

// dependencies excluded from the final jar, since they are declared as runtime dependencies
dependencies {
project.configurations.shadowed.allDependencies.each {
exclude(dependency(it))
}
// exclude proto files from the jar
exclude "**/opentelemetry/proto/**/*.proto"
exclude "**/google/protobuf/*.proto"
}
// Temporarily disabled shadowJar configuration due to ASM compatibility issue
// Use regular jar instead

jar {
enabled true
dependsOn createVersionFile

// Add the version file and licenses to regular jar
from("${layout.buildDirectory.get().asFile.path}") {
include "kafka/$buildVersionFileName"
}

from "$rootDir/LICENSE"
from "$rootDir/NOTICE"
}

jar {
enabled false
dependsOn 'shadowJar'
}

clean.doFirst {
delete "${layout.buildDirectory.get().asFile.path}/kafka/"
}
Expand Down Expand Up @@ -2673,7 +2650,7 @@ project(':streams') {
}

dependencies {
api project(path: ':clients', configuration: 'shadow')
api project(path: ':clients') // Changed from 'shadow' configuration due to ASM compatibility issue
// `org.rocksdb.Options` is part of Kafka Streams public api via `RocksDBConfigSetter`
api libs.rocksDBJni

Expand Down Expand Up @@ -3381,11 +3358,14 @@ project(':streams:upgrade-system-tests-41') {

project(':jmh-benchmarks') {

apply plugin: 'com.gradleup.shadow'
// Temporarily disabled due to ASM compatibility issue
// apply plugin: 'com.github.johnrengelman.shadow'

/*
shadowJar {
archiveBaseName = 'kafka-jmh-benchmarks'
}
*/

dependencies {
implementation(project(':core')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ private void testTransactionAfterProducerIdExpires(ClusterInstance clusterInstan
// The epoch should be at least oldProducerEpoch + 2 for the first commit and the restarted producer.
assertTrue(oldProducerEpoch + 2 <= newProducerEpoch);
} else {
assertEquals(oldProducerEpoch + 1, newProducerEpoch);
assertEquals(oldProducerEpoch + 3, newProducerEpoch);
}

assertConsumeRecords(clusterInstance, List.of(TOPIC1), 2);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaRaftServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.utils.{AppInfoParser, Time}
import org.apache.kafka.common.{KafkaException, Uuid}
import org.apache.kafka.metadata.KafkaConfigSchema
import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata}
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble}
import org.apache.kafka.raft.QuorumConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ static ControllerResult<Void> recordsForEmptyLog(
static ControllerResult<Void> recordsForNonEmptyLog(
Consumer<String> activationMessageConsumer,
long transactionStartOffset,
MetadataVersion curMetadataVersion
BootstrapMetadata bootstrapMetadata,
MetadataVersion curMetadataVersion,
int defaultMinInSyncReplicas
) {
StringBuilder logMessageBuilder = new StringBuilder("Performing controller activation. ");

Expand All @@ -138,8 +140,54 @@ static ControllerResult<Void> recordsForNonEmptyLog(
}
}

// Write bootstrap records to the log so brokers can read them, but only if not handling a partial transaction
// Brokers can't read snapshots, only log entries
boolean shouldWriteBootstrapRecords = (transactionStartOffset == -1L);
if (shouldWriteBootstrapRecords) {
logMessageBuilder
.append("Writing bootstrap records to log for broker consumption. ")
.append("Appending ")
.append(bootstrapMetadata.records().size())
.append(" bootstrap record(s) ");

if (curMetadataVersion.isMetadataTransactionSupported()) {
records.add(new ApiMessageAndVersion(
new BeginTransactionRecord().setName("Bootstrap records"), (short) 0));
logMessageBuilder.append("in metadata transaction ");
}
logMessageBuilder
.append("at metadata.version ")
.append(curMetadataVersion)
.append(" from bootstrap source '")
.append(bootstrapMetadata.source())
.append("'. ");

// Add bootstrap records
records.addAll(bootstrapMetadata.records());

// If ELR is enabled, we need to set a cluster-level min.insync.replicas.
if (bootstrapMetadata.featureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME) > 0) {
records.add(new ApiMessageAndVersion(new ConfigRecord().
setResourceType(BROKER.id()).
setResourceName("").
setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
setValue(Integer.toString(defaultMinInSyncReplicas)), (short) 0));
}

if (curMetadataVersion.isMetadataTransactionSupported()) {
records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0));
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we changing this code?

We shouldn't change this code though, because when the log is non-empty, it means the bootstrap metadata records have already been written in the log before.

activationMessageConsumer.accept(logMessageBuilder.toString().trim());
return ControllerResult.atomicOf(records, null);

// If we wrote bootstrap records and transactions are supported, use non-atomic result
// If we only aborted a transaction or don't support transactions, use atomic result
if (shouldWriteBootstrapRecords && curMetadataVersion.isMetadataTransactionSupported()) {
return ControllerResult.of(records, null);
} else {
return ControllerResult.atomicOf(records, null);
}
}

/**
Expand Down Expand Up @@ -167,7 +215,9 @@ static ControllerResult<Void> generate(
} else {
return recordsForNonEmptyLog(activationMessageConsumer,
transactionStartOffset,
curMetadataVersion.get());
bootstrapMetadata,
curMetadataVersion.get(),
defaultMinInSyncReplicas);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.fault.FaultHandler;
import org.apache.kafka.server.fault.FaultHandlerException;
Expand Down Expand Up @@ -384,8 +385,6 @@ public Builder setUncleanLeaderElectionCheckIntervalMs(long uncleanLeaderElectio
public QuorumController build() throws Exception {
if (raftClient == null) {
throw new IllegalStateException("You must set a raft client.");
} else if (bootstrapMetadata == null) {
throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
Comment on lines -394 to -395
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep this assertion because bootstrapMetadata is never null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea here is that for when we try to read bootstrap.checkpoint from KafkaRaftServer, in the scenarios where this file doesn't exist we would send a null value. Currently BootstrapDirectory.read() defaults to a non null value. I was thinking we change this to null as a way of determining if the bootstrap.checkpoint exists for WARN messages. Unless we do want to keep the behavior of defaulting to non null records?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but also we could just check if both files exist. actually yeah I'll just revert this

Copy link
Copy Markdown
Contributor

@kevin-wu24 kevin-wu24 Nov 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless we do want to keep the behavior of defaulting to non null records?

I think we should keep this behavior unless there is a good reason to change it. This behavior address the case where the bootstrap.checkpoint is accidentally deleted before bringing up a cluster.

I was thinking we change this to null as a way of determining if the bootstrap.checkpoint exists for WARN messages

If we preserve the existing behavior, there's no way to print the WARN message at the QuorumController level, but that case should be impossible from a formatting perspective. For now, let's keep the existing behavior around the default value of bootstrapMetadata, and we can revisit this discussion later.

} else if (quorumFeatures == null) {
throw new IllegalStateException("You must specify the quorum features");
} else if (nonFatalFaultHandler == null) {
Expand Down Expand Up @@ -1022,7 +1021,20 @@ public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
Batch<ApiMessageAndVersion> batch = reader.next();
long offset = batch.lastOffset();
List<ApiMessageAndVersion> messages = batch.records();

if (bootstrapMetadata == null) {
if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) {
Copy link
Copy Markdown
Contributor

@kevin-wu24 kevin-wu24 Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we are reading in the 0-0.checkpoint, the ONLY thing we should be doing if !messages.isEmpty() in this method is using messages to construct a bootstrapMetadata object. It should not append an event even I think...

0-0.checkpoint is special because its records are uncommitted, unlike all other checkpoints this method handles, and need to be written to the log when a leader is determined.

This changed because previously, 0-0.checkpoint did not contain any metadata records, just KRaft control records potentially.

// For bootstrap snapshots, extract feature levels from all data records
if (batch.controlRecords().isEmpty()) {
bootstrapMetadata = BootstrapMetadata.fromRecords(messages, "bootstrap");
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct logic here is:
If the batch has records, read them into bootstrapMetadata (this means 0-0.checkpoint has bootstrap metadata records).
If the batch doesn't have records, try to read the bootstrapMetadata from bootstrap.checkpoint.

} else {
Map<String, Short> featureVersions = new HashMap<>();
MetadataVersion metadataVersion = MetadataVersion.latestProduction();
featureVersions.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel());
featureVersions.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel());
bootstrapMetadata = BootstrapMetadata.fromVersions(metadataVersion, featureVersions, "generated default");
Copy link
Copy Markdown
Contributor

@kevin-wu24 kevin-wu24 Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're not reading the 0-0.checkpoint, bootstrapMetadata is either:

  1. read from bootstrap.checkpoint and passed down here, so it is non-null.
  2. null, because it should have already been written to the log as part of the 0-0.checkpoint.

}
}
log.debug("Replaying snapshot {} batch with last offset of {}",
snapshotName, offset);

Expand Down Expand Up @@ -1139,6 +1151,10 @@ class CompleteActivationEvent implements ControllerWriteOperation<Void> {
@Override
public ControllerResult<Void> generateRecordsAndResult() {
try {
if (bootstrapMetadata == null) {
throw new IllegalStateException("Bootstrap metadata not available during activation. " +
"This should not happen if a bootstrap snapshot was processed.");
}
Copy link
Copy Markdown
Contributor

@kevin-wu24 kevin-wu24 Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should allow bootstrapMetadata to be null here because we can be in the case where we bootstrapped using 0-0.checkpoint, but that file no longer exists because it was cleaned up by KRaft. However, bootstrapMetadata cannot be null when we call recordsForEmptyLog. It can be null when we call recordsForNonEmptyLog

return ActivationRecordsGenerator.generate(
log::warn,
offsetControl.transactionStartOffset(),
Expand Down Expand Up @@ -1436,7 +1452,7 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon
/**
* The bootstrap metadata to use for initialization if needed.
*/
private final BootstrapMetadata bootstrapMetadata;
private BootstrapMetadata bootstrapMetadata;
Comment thread
jsancio marked this conversation as resolved.

/**
* The maximum number of records per batch to allow.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;

/**
* A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.checkpoint" is used and the
Expand All @@ -42,6 +43,8 @@
public class BootstrapDirectory {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not need to materially change this file at all (at least for now), since this file only deals with the old bootstrap.checkpoint file that is deprecated going forward.

It looks like it handles the bootstrap.checkpoint file not existing by defaulting to a bootstrap record set of just the latest MV. This means bootstrapMetadata is never null.

Maybe we want to rename the file to LegacyBootstrapDirectory, mark this as deprecated, and log a message when we successfully read bootstrap.checkpoint.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes were mainly made to address failing tests. (Only used in two test files). I agree that we can probably deprecate this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, looked at StorageToolTest and FormatterTest again, and I am okay with your solution of having two separate methods. In that case, we should deprecate maybeReadLegacyBootstrapCheckpoint() and BINARY_BOOTSTRAP_FILENAME.

public static final String BINARY_BOOTSTRAP_FILENAME = "bootstrap.checkpoint";

public static final String BINARY_BOOTSTRAP_CHECKPOINT_FILENAME = "00000000000000000000-0000000000.checkpoint";

private final String directoryPath;

/**
Expand All @@ -65,9 +68,17 @@ public BootstrapMetadata read() throws Exception {
throw new RuntimeException("No such directory as " + directoryPath);
}
}
Path binaryBootstrapPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME);
Path binaryBootstrapPath = Paths.get(directoryPath, String.format("%s-%d",
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
CLUSTER_METADATA_TOPIC_PARTITION.partition()),
BINARY_BOOTSTRAP_CHECKPOINT_FILENAME);
if (!Files.exists(binaryBootstrapPath)) {
return readFromConfiguration();
Path oldBootstrapPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME);
if (!Files.exists(oldBootstrapPath)) {
return readFromConfiguration();
} else {
return readFromBinaryFile(oldBootstrapPath.toString());
}
} else {
return readFromBinaryFile(binaryBootstrapPath.toString());
}
Expand Down
Loading