KAFKA-19648; Cluster metadata bootstrapping with kraft checkpoint#20707
Conversation
kevin-wu24
left a comment
There was a problem hiding this comment.
Thanks for the changes @mannoopj. Some high level comments:
There was a problem hiding this comment.
There is a clearer way to make these changes. This method is what writes the 0-0.checkpoint currently. We should pass the bootstrap metadata object here and append the metadata records using writer.append.append(bootstrapMetadata.records()) before calling writer.freeze().
| if (directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) { | ||
| writeDynamicQuorumSnapshot(writeLogDir, | ||
| writeDynamicQuorumSnapshot(clusterMetadataDirectory.getPath(), |
There was a problem hiding this comment.
We should rename writeDynamicQuorumSnapshot to writeZeroSnapshot, and write it when formatting metadata directories. The semantics that change here are that we should not write the KRaft control records (KRaft version and voter set) when !isDynamicMetadataDirectory().
| File createdBoostrapCheckpoint = new File(clusterMetadataDirectory.getPath() + "/" + BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME); | ||
| File created000Checkpoint = new File(clusterMetadataDirectory.getPath() + "/" + BootstrapDirectory.BINARY_CHECKPOINT_FILENAME); | ||
| Files.write( | ||
| createdBoostrapCheckpoint.toPath(), | ||
| Files.readAllBytes(created000Checkpoint.toPath()), | ||
| StandardOpenOption.APPEND); | ||
| try { | ||
| created000Checkpoint.delete(); | ||
| createdBoostrapCheckpoint.renameTo(created000Checkpoint); | ||
| } catch (Exception ex) { | ||
| throw new RuntimeException("Failed operation to combine metadata and kraft records: ", ex); | ||
| } | ||
| } else { | ||
| File createdBoostrapCheckpoint = new File(clusterMetadataDirectory.getPath() + "/" + BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME); | ||
| File created000Checkpoint = new File(clusterMetadataDirectory.getPath() + "/" + BootstrapDirectory.BINARY_CHECKPOINT_FILENAME); | ||
| try { | ||
| createdBoostrapCheckpoint.renameTo(created000Checkpoint); | ||
| } catch (Exception ex) { | ||
| throw new RuntimeException("Failed to rename file: ", ex); | ||
| } |
There was a problem hiding this comment.
This code is confusing. Instead of doing this renaming and deleting. We should instead remove the call to write the bootstrap metadata to disk on Line 447, since we're no longer writing bootstrap.checkpoint anymore, and follow the other comments for writing metadata records to 0-0.checkpoint.
We can check if an old bootstrap.checkpoint exists and delete it, since IIRC that was part of the KIP.
42e105a to
a9c79a3
Compare
kevin-wu24
left a comment
There was a problem hiding this comment.
Thanks for the changes @mannoopj. Left some more comments
| try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde(), Optional.of(bootstrapMetadata.records()))) { | ||
| writer.freeze(); |
There was a problem hiding this comment.
| try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde(), Optional.of(bootstrapMetadata.records()))) { | |
| writer.freeze(); | |
| try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde()))) { | |
| writer.append(bootstrapMetadata.records()); | |
| writer.freeze(); |
There was a problem hiding this comment.
Wouldn't this write the bootstrap records after the control records, since in RecordsSnapshotWriter.build() is where we append the kraft records and we would be calling that ahead of writer.append in this scenario? we want the bootstrap records ahead correct?
There was a problem hiding this comment.
It doesn't matter what order we write them in because KRaft only reads the control records, and the metadata module will only read the "data" records. When we read the 0-0.checkpoint back into memory, we only deal with either its control records or its data records, not both in the same code.
| .setKraftVersion(KRaftVersion.KRAFT_VERSION_1) | ||
| .setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true)))) | ||
| .build(MetadataRecordSerde.INSTANCE) | ||
| .build(MetadataRecordSerde.INSTANCE, emptyOptional) |
There was a problem hiding this comment.
We should be able to remove this. This applies to other instances where we build the RecordsSnapshotWriter.
| } | ||
| writeBoostrapSnapshot(writeLogDir, | ||
| bootstrapMetadata, | ||
| initialControllers.get(), |
There was a problem hiding this comment.
We need to pass the optional for initialControllers here. We can only do a .get() if initialControllers.isPresent().
There was a problem hiding this comment.
We should only construct the VoterSet object if initialControllers.isPresent().
| // For bootstrap snapshots, extract feature levels from all data records | ||
| if (batch.controlRecords().isEmpty()) { | ||
| bootstrapMetadata = BootstrapMetadata.fromRecords(messages, "bootstrap"); | ||
| } |
There was a problem hiding this comment.
The correct logic here is:
If the batch has records, read them into bootstrapMetadata (this means 0-0.checkpoint has bootstrap metadata records).
If the batch doesn't have records, try to read the bootstrapMetadata from bootstrap.checkpoint.
kevin-wu24
left a comment
There was a problem hiding this comment.
Thanks for the changes @mannoopj. Review of the metadata layer implementation:
| if (level > 0) { | ||
| records.add(new ApiMessageAndVersion(new FeatureLevelRecord(). | ||
| setName(featureName). | ||
| setFeatureLevel(level), (short) 0)); | ||
| } | ||
| // Include all feature levels, including level 0 which may disable features | ||
| records.add(new ApiMessageAndVersion(new FeatureLevelRecord(). | ||
| setName(featureName). | ||
| setFeatureLevel(level), (short) 0)); |
There was a problem hiding this comment.
This should not change. The default level of features is 0, and that is why we don't add a record for them when the level is 0.
| CLUSTER_METADATA_TOPIC_PARTITION.partition()), | ||
| BINARY_BOOTSTRAP_CHECKPOINT_FILENAME); | ||
| if (!Files.exists(binaryBootstrapPath)) { | ||
| return readFromConfiguration(); |
There was a problem hiding this comment.
If we are at L76, that means we do 0-0.checkpoint doesn't exist. This is where we should read from bootstrap.checkpoint. If that doesn't exist too, then we call readFromConfiguration().
There was a problem hiding this comment.
Actually, if you look at how we're reading stuff in, we probably don't even need to change this file. We don't need to call BootstrapDirectory#read for the 0-0.checkpoint because we are using handleLoadSnapshot, which already puts the checkpoint in memory for us.
| (metaPropsEnsemble, bootstrapMetadata) | ||
| // val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir) | ||
| // val bootstrapMetadata = bootstrapDirectory.read() | ||
| (metaPropsEnsemble, null) |
There was a problem hiding this comment.
We should remove bootstrapMetadata here and in ControllerServer since it is just being passed down to QuorumController eventually. We initialize it in QuorumController.
|
|
||
| // Copy feature levels from TestKitNodes bootstrap metadata to ensure test annotations are respected | ||
| for (var record : nodes.bootstrapMetadata().records()) { | ||
| if (record.message() instanceof FeatureLevelRecord featureLevelRecord) { | ||
| String featureName = featureLevelRecord.name(); | ||
| short level = featureLevelRecord.featureLevel(); | ||
| // Don't override MetadataVersion as it's handled by setReleaseVersion() | ||
| if (!featureName.equals("metadata.version")) { | ||
| formatter.setFeatureLevel(featureName, level); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Some background on how the KafkaClusterTestKit works:
Basically, tests that use this class are "integration tests" in the sense that we're trying to replicate a real cluster, just all within the same JVM. That means multiple brokerServers and controllerServers. This file shouldn't change outside of removing nodes.bootstrapMetadata() from the ControllerServer constructor.
462e072 to
9852262
Compare
kevin-wu24
left a comment
There was a problem hiding this comment.
Left a high level comment @mannoopj
| // val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir) | ||
| // val bootstrapMetadata = bootstrapDirectory.read() | ||
| (metaPropsEnsemble, null) |
There was a problem hiding this comment.
To make this change more compatible for the existing test framework, we should instead pass down a BootstrapCheckpointFactory/Builder or something like that. Then have two separate implementations:
One for tests that specifies a BootstrapMetadata object all in-memory based on the factory.
In the actual implementation, we can point that factory to the actual files on disk we would be reading.
Either way, in QuorumController#handleLoadSnapshot, that is when we actually "resolve" this bootstrap metadata stuff by calling a method on the factory/builder object.\
EDIT: after looking at QuorumTestHarness and KafkaClusterTestKit, we shouldn't need to do this.
kevin-wu24
left a comment
There was a problem hiding this comment.
Left some more comments regarding the metadata layer implementation @mannoopj
| // Write bootstrap records to the log so brokers can read them, but only if not handling a partial transaction | ||
| // Brokers can't read snapshots, only log entries | ||
| boolean shouldWriteBootstrapRecords = (transactionStartOffset == -1L); | ||
| if (shouldWriteBootstrapRecords) { | ||
| logMessageBuilder | ||
| .append("Writing bootstrap records to log for broker consumption. ") | ||
| .append("Appending ") | ||
| .append(bootstrapMetadata.records().size()) | ||
| .append(" bootstrap record(s) "); | ||
|
|
||
| if (curMetadataVersion.isMetadataTransactionSupported()) { | ||
| records.add(new ApiMessageAndVersion( | ||
| new BeginTransactionRecord().setName("Bootstrap records"), (short) 0)); | ||
| logMessageBuilder.append("in metadata transaction "); | ||
| } | ||
| logMessageBuilder | ||
| .append("at metadata.version ") | ||
| .append(curMetadataVersion) | ||
| .append(" from bootstrap source '") | ||
| .append(bootstrapMetadata.source()) | ||
| .append("'. "); | ||
|
|
||
| // Add bootstrap records | ||
| records.addAll(bootstrapMetadata.records()); | ||
|
|
||
| // If ELR is enabled, we need to set a cluster-level min.insync.replicas. | ||
| if (bootstrapMetadata.featureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME) > 0) { | ||
| records.add(new ApiMessageAndVersion(new ConfigRecord(). | ||
| setResourceType(BROKER.id()). | ||
| setResourceName(""). | ||
| setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG). | ||
| setValue(Integer.toString(defaultMinInSyncReplicas)), (short) 0)); | ||
| } | ||
|
|
||
| if (curMetadataVersion.isMetadataTransactionSupported()) { | ||
| records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0)); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Why are we changing this code?
We shouldn't change this code though, because when the log is non-empty, it means the bootstrap metadata records have already been written in the log before.
| if (batch.controlRecords().isEmpty()) { | ||
| System.out.println("DEBUG: Extracting bootstrap metadata from " + messages.size() + " records"); | ||
| bootstrapMetadata = BootstrapMetadata.fromRecords(messages, "bootstrap"); | ||
| System.out.println("DEBUG: Bootstrap metadata extracted: " + bootstrapMetadata); | ||
| } |
There was a problem hiding this comment.
This is the wrong if condition. We should check !batch.records.isEmpty.
If the 0-0.checkpoint does not have metadata records, AND bootstrapMetadata == null at this point, we should throw an IllegalStateException, because we cannot construct bootstrapMetadata.
| } else { | ||
| Map<String, Short> featureVersions = new HashMap<>(); | ||
| MetadataVersion metadataVersion = MetadataVersion.latestProduction(); | ||
| featureVersions.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel()); | ||
| featureVersions.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel()); | ||
| bootstrapMetadata = BootstrapMetadata.fromVersions(metadataVersion, featureVersions, "generated default"); |
There was a problem hiding this comment.
If we're not reading the 0-0.checkpoint, bootstrapMetadata is either:
- read from
bootstrap.checkpointand passed down here, so it is non-null. - null, because it should have already been written to the log as part of the 0-0.checkpoint.
| if (bootstrapMetadata == null) { | ||
| throw new IllegalStateException("Bootstrap metadata not available during activation. " + | ||
| "This should not happen if a bootstrap snapshot was processed."); | ||
| } |
There was a problem hiding this comment.
We should allow bootstrapMetadata to be null here because we can be in the case where we bootstrapped using 0-0.checkpoint, but that file no longer exists because it was cleaned up by KRaft. However, bootstrapMetadata cannot be null when we call recordsForEmptyLog. It can be null when we call recordsForNonEmptyLog
| List<ApiMessageAndVersion> messages = batch.records(); | ||
|
|
||
| if (bootstrapMetadata == null) { | ||
| if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) { |
There was a problem hiding this comment.
When we are reading in the 0-0.checkpoint, the ONLY thing we should be doing if !messages.isEmpty() in this method is using messages to construct a bootstrapMetadata object. It should not append an event even I think...
0-0.checkpoint is special because its records are uncommitted, unlike all other checkpoints this method handles, and need to be written to the log when a leader is determined.
This changed because previously, 0-0.checkpoint did not contain any metadata records, just KRaft control records potentially.
Was working under the assumptions that the raft io thread was calling |
| case KRAFT_VERSION: { | ||
| KRaftVersionRecord message = new KRaftVersionRecord(); | ||
| message.read(new ByteBufferAccessor(record.value()), (short) 0); | ||
| messages.add(new ApiMessageAndVersion(message, (short) 0)); | ||
| break; | ||
| } | ||
| case KRAFT_VOTERS: | ||
| VotersRecord message = new VotersRecord(); | ||
| message.read(new ByteBufferAccessor(record.value()), (short) 0); | ||
| messages.add(new ApiMessageAndVersion(message, (short) 0)); | ||
| break; |
There was a problem hiding this comment.
Did you file an issue for this? If so can you link it here?
|
@mannoopj there are test failures please take a look. |
The integration tests failure can be mapped out this way:
On this branch:
This seems like an existing bug that never manifested. The problem now is |
Yes, looks like the issue is with how the test is constructed. Can we avoid using BootstrapMetadata for bootstrapping the the cluster nodes? For example, can the test be configured using the Formatter directly? |
|
Hi @mannoopj, To me, the issue seems to be in step 4. In the current code, I think this test infra logic only differs when a builder of I think this difference is a consequence of the integration tests needing to generate |
|
I went with @kevin-wu24's suggestion here as it was the most straight forward solution. Restructuring the Test infra to avoid using BootstrapMetadata seems to me to be out of scope of this PR. |
| Files.createDirectories(Paths.get(writeLogDir)); | ||
| BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(writeLogDir); | ||
| bootstrapDirectory.writeBinaryFile(bootstrapMetadata); | ||
| if (directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) { |
There was a problem hiding this comment.
Should we keep the isDynamicMetadataDirectory() condition here? I noticed that the E2E tests fail in combined mode with multiple log directories because the pure data folders end up containing the unexpected __cluster_metadata-0 directory
There was a problem hiding this comment.
Maybe we should change this check to isMetadataDirectory. We should still write a 0-0.checkpoint for static quorums. Basically:
boolean isMetadataDirectory() {
return this != LOG_DIRECTORY;
}
There was a problem hiding this comment.
@kevin-wu24 Yes, your approach is much better and more precise. We will file a patch tomorrow if there is no objection.
There was a problem hiding this comment.
Thanks @chia7712 and @kevin-wu24 . Let's make sure we have a test that covers this case.
There was a problem hiding this comment.
Thanks for catching this and for the discussion. I opened a PR to address this issue: #22418
Previously, bootstrap metadata was stored in a separate
bootstrap.checkpoint file, while the zero checkpoint contained only
KRaft control records. This change unifies them by having the Formatter
append bootstrap metadata records into the zero checkpoint alongside the
existing KRaft control records, integrating with KRaft's bootstrapping
checkpoint mechanisms like RaftClient.Listener#handleLoadBootstrap and
KIP-630 snapshot lifecycle management.
QuorumController's handleLoadBootstrap now extracts bootstrap records
from the zero checkpoint and stores them as BootstrapMetadata, which is
later committed by ActivationRecordsGenerator when the controller
activates on an empty metadata log.
The BootstrapDirectory class is removed and its functionality
consolidated into static methods on BootstrapMetadata#fromDirectory
reads from the legacy bootstrap.checkpoint (falling back to defaults),
and fromCheckpointFile reads from a specific checkpoint path.
StorageTool now only writes the bootstrap snapshot when the node has the
Controller role. KafkaClusterTestKit is updated to pass non-feature
versions, non-SCRAM bootstrap records to the Formatter as additional
bootstrap records.
Reviewers: José Armando García Sancio jsancio@apache.org, Kevin Wu
kevin.wu2412@gmail.com