-
Notifications
You must be signed in to change notification settings - Fork 15.2k
KAFKA-19648; Cluster metadata bootstrapping with kraft checkpoint #20707
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
a9c79a3
2a6f663
752a79e
026da2b
7e303b9
584fe3d
9852262
9cb5318
ff3c050
f5763d0
f3a3e41
0ab8e02
1f76fa9
4de55ce
56435b3
29affeb
533033b
f35e992
1c4528e
625acfb
cb2ad18
3c809c1
1908b2c
7226af0
029aa6b
6dbef44
cc3f0b4
7ad1342
135c126
383045a
b474770
721fb5c
cb4803d
6a46cf7
adb1548
a839631
96bc7f3
4823d30
082d469
dae3bd9
8ebfdaf
5ba0c9b
a7450e0
dcd1b0b
67ad6c4
0a78d0d
de4da73
eb4ba90
1d1e7ba
dc27d7c
8469809
2e0c8f0
a182b12
0da0919
fd3adf6
3d589e6
56dae1b
b3feaa6
03eafae
7e8d2e6
7a9404f
9f82078
ff60272
e05dcf2
f0cbc8b
031c0fc
f0b35af
56422a5
3614162
5698e93
d4e3b84
9380605
0e3c626
df3108b
f9ad128
76146ad
2ad3c53
473ac38
da05d3f
7d078b8
b27e7fc
b82a873
711a347
02c9b79
de8bb36
da46fb7
4993aaa
a7edcd0
6601e69
3b4aeb6
7896f12
207ca7c
9ee9f3d
9b668cb
645a08a
8d44f43
3a57241
4c7d5f6
ce5faf6
b83d676
f8df406
d1a8ea7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -116,6 +116,7 @@ | |
| import org.apache.kafka.server.authorizer.AclDeleteResult; | ||
| import org.apache.kafka.server.common.ApiMessageAndVersion; | ||
| import org.apache.kafka.server.common.KRaftVersion; | ||
| import org.apache.kafka.server.common.MetadataVersion; | ||
| import org.apache.kafka.server.common.OffsetAndEpoch; | ||
| import org.apache.kafka.server.fault.FaultHandler; | ||
| import org.apache.kafka.server.fault.FaultHandlerException; | ||
|
|
@@ -384,8 +385,6 @@ public Builder setUncleanLeaderElectionCheckIntervalMs(long uncleanLeaderElectio | |
| public QuorumController build() throws Exception { | ||
| if (raftClient == null) { | ||
| throw new IllegalStateException("You must set a raft client."); | ||
| } else if (bootstrapMetadata == null) { | ||
| throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool."); | ||
|
Comment on lines
-394
to
-395
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can keep this assertion because
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The idea here is that for when we try to read
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but also we could just check if both files exist. actually yeah I'll just revert this
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think we should keep this behavior unless there is a good reason to change it. This behavior address the case where the
If we preserve the existing behavior, there's no way to print the |
||
| } else if (quorumFeatures == null) { | ||
| throw new IllegalStateException("You must specify the quorum features"); | ||
| } else if (nonFatalFaultHandler == null) { | ||
|
|
@@ -1022,7 +1021,22 @@ public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> reader) { | |
| Batch<ApiMessageAndVersion> batch = reader.next(); | ||
| long offset = batch.lastOffset(); | ||
| List<ApiMessageAndVersion> messages = batch.records(); | ||
|
|
||
| if (bootstrapMetadata == null) { | ||
| if (reader.snapshotId().equals(Snapshots.BOOTSTRAP_SNAPSHOT_ID)) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we are reading in the 0-0.checkpoint is special because its records are uncommitted, unlike all other checkpoints this method handles, and need to be written to the log when a leader is determined. This changed because previously, |
||
| // For bootstrap snapshots, extract feature levels from all data records | ||
| if (batch.controlRecords().isEmpty()) { | ||
| System.out.println("DEBUG: Extracting bootstrap metadata from " + messages.size() + " records"); | ||
| bootstrapMetadata = BootstrapMetadata.fromRecords(messages, "bootstrap"); | ||
| System.out.println("DEBUG: Bootstrap metadata extracted: " + bootstrapMetadata); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the wrong if condition. We should check If the 0-0.checkpoint does not have metadata records, AND |
||
| } else { | ||
| Map<String, Short> featureVersions = new HashMap<>(); | ||
| MetadataVersion metadataVersion = MetadataVersion.latestProduction(); | ||
| featureVersions.put(MetadataVersion.FEATURE_NAME, metadataVersion.featureLevel()); | ||
| featureVersions.put(KRaftVersion.FEATURE_NAME, raftClient.kraftVersion().featureLevel()); | ||
| bootstrapMetadata = BootstrapMetadata.fromVersions(metadataVersion, featureVersions, "generated default"); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we're not reading the 0-0.checkpoint,
|
||
| } | ||
| } | ||
| log.debug("Replaying snapshot {} batch with last offset of {}", | ||
| snapshotName, offset); | ||
|
|
||
|
|
@@ -1139,6 +1153,10 @@ class CompleteActivationEvent implements ControllerWriteOperation<Void> { | |
| @Override | ||
| public ControllerResult<Void> generateRecordsAndResult() { | ||
| try { | ||
| if (bootstrapMetadata == null) { | ||
| throw new IllegalStateException("Bootstrap metadata not available during activation. " + | ||
| "This should not happen if a bootstrap snapshot was processed."); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should allow bootstrapMetadata to be null here because we can be in the case where we bootstrapped using |
||
| return ActivationRecordsGenerator.generate( | ||
| log::warn, | ||
| offsetControl.transactionStartOffset(), | ||
|
|
@@ -1436,7 +1454,7 @@ private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, lon | |
| /** | ||
| * The bootstrap metadata to use for initialization if needed. | ||
| */ | ||
| private final BootstrapMetadata bootstrapMetadata; | ||
| private BootstrapMetadata bootstrapMetadata; | ||
|
jsancio marked this conversation as resolved.
|
||
|
|
||
| /** | ||
| * The maximum number of records per batch to allow. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,6 +34,7 @@ | |
|
|
||
| import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; | ||
| import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; | ||
| import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION; | ||
|
|
||
| /** | ||
| * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.checkpoint" is used and the | ||
|
|
@@ -42,6 +43,8 @@ | |
| public class BootstrapDirectory { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should not need to materially change this file at all (at least for now), since this file only deals with the old It looks like it handles the Maybe we want to rename the file to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These changes were mainly made to address failing tests. (Only used in two test files). I agree that we can probably deprecate this.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, looked at |
||
| public static final String BINARY_BOOTSTRAP_FILENAME = "bootstrap.checkpoint"; | ||
|
|
||
| public static final String BINARY_BOOTSTRAP_CHECKPOINT_FILENAME = "00000000000000000000-0000000000.checkpoint"; | ||
|
|
||
| private final String directoryPath; | ||
|
|
||
| /** | ||
|
|
@@ -65,9 +68,17 @@ public BootstrapMetadata read() throws Exception { | |
| throw new RuntimeException("No such directory as " + directoryPath); | ||
| } | ||
| } | ||
| Path binaryBootstrapPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME); | ||
| Path binaryBootstrapPath = Paths.get(directoryPath, String.format("%s-%d", | ||
| CLUSTER_METADATA_TOPIC_PARTITION.topic(), | ||
| CLUSTER_METADATA_TOPIC_PARTITION.partition()), | ||
| BINARY_BOOTSTRAP_CHECKPOINT_FILENAME); | ||
| if (!Files.exists(binaryBootstrapPath)) { | ||
| return readFromConfiguration(); | ||
| Path oldBootstrapPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME); | ||
| if (!Files.exists(oldBootstrapPath)) { | ||
| return readFromConfiguration(); | ||
| } else { | ||
| return readFromBinaryFile(oldBootstrapPath.toString()); | ||
| } | ||
| } else { | ||
| return readFromBinaryFile(binaryBootstrapPath.toString()); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we changing this code?
We shouldn't change this code though, because when the log is non-empty, it means the bootstrap metadata records have already been written in the log before.