Skip to content

Commit 42e105a

Browse files
committed
change formatter to combine bootstrap.checkpoint into 000.checkpoint
1 parent 25de320 commit 42e105a

3 files changed

Lines changed: 45 additions & 13 deletions

File tree

metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
*/
4242
public class BootstrapDirectory {
4343
public static final String BINARY_BOOTSTRAP_FILENAME = "bootstrap.checkpoint";
44+
public static final String BINARY_CHECKPOINT_FILENAME = "00000000000000000000-0000000000.checkpoint";
4445

4546
private final String directoryPath;
4647

@@ -65,7 +66,7 @@ public BootstrapMetadata read() throws Exception {
6566
throw new RuntimeException("No such directory as " + directoryPath);
6667
}
6768
}
68-
Path binaryBootstrapPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME);
69+
Path binaryBootstrapPath = Paths.get(directoryPath, BINARY_CHECKPOINT_FILENAME);
6970
if (!Files.exists(binaryBootstrapPath)) {
7071
return readFromConfiguration();
7172
} else {

metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.io.PrintStream;
4242
import java.nio.file.Files;
4343
import java.nio.file.Paths;
44+
import java.nio.file.StandardOpenOption;
4445
import java.util.ArrayList;
4546
import java.util.Collection;
4647
import java.util.EnumSet;
@@ -437,14 +438,40 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws Exception {
437438
directoryTypes.get(writeLogDir).description(), writeLogDir,
438439
MetadataVersion.FEATURE_NAME, releaseVersion);
439440
Files.createDirectories(Paths.get(writeLogDir));
440-
BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(writeLogDir);
441+
File parentDir = new File(writeLogDir);
442+
File clusterMetadataDirectory = new File(parentDir, String.format("%s-%d",
443+
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
444+
CLUSTER_METADATA_TOPIC_PARTITION.partition()));
445+
Files.createDirectories(clusterMetadataDirectory.toPath());
446+
BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(clusterMetadataDirectory.getPath());
441447
bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
442448
if (directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
443-
writeDynamicQuorumSnapshot(writeLogDir,
449+
writeDynamicQuorumSnapshot(clusterMetadataDirectory.getPath(),
444450
initialControllers.get(),
445451
featureLevels.get(KRaftVersion.FEATURE_NAME),
446452
controllerListenerName);
453+
File createdBoostrapCheckpoint = new File(clusterMetadataDirectory.getPath() + "/" + BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME);
454+
File created000Checkpoint = new File(clusterMetadataDirectory.getPath() + "/" + BootstrapDirectory.BINARY_CHECKPOINT_FILENAME);
455+
Files.write(
456+
createdBoostrapCheckpoint.toPath(),
457+
Files.readAllBytes(created000Checkpoint.toPath()),
458+
StandardOpenOption.APPEND);
459+
try {
460+
created000Checkpoint.delete();
461+
createdBoostrapCheckpoint.renameTo(created000Checkpoint);
462+
} catch (Exception ex) {
463+
throw new RuntimeException("Failed operation to combine metadata and kraft records: ", ex);
464+
}
465+
} else {
466+
File createdBoostrapCheckpoint = new File(clusterMetadataDirectory.getPath() + "/" + BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME);
467+
File created000Checkpoint = new File(clusterMetadataDirectory.getPath() + "/" + BootstrapDirectory.BINARY_CHECKPOINT_FILENAME);
468+
try {
469+
createdBoostrapCheckpoint.renameTo(created000Checkpoint);
470+
} catch (Exception ex) {
471+
throw new RuntimeException("Failed to rename file: ", ex);
472+
}
447473
}
474+
448475
});
449476
copier.setWriteErrorHandler((errorLogDir, e) -> {
450477
throw new FormatterException("Error while writing meta.properties file " +
@@ -498,16 +525,12 @@ static void writeDynamicQuorumSnapshot(
498525
short kraftVersion,
499526
String controllerListenerName
500527
) {
501-
File parentDir = new File(writeLogDir);
502-
File clusterMetadataDirectory = new File(parentDir, String.format("%s-%d",
503-
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
504-
CLUSTER_METADATA_TOPIC_PARTITION.partition()));
505528
VoterSet voterSet = initialControllers.toVoterSet(controllerListenerName);
506529
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder().
507530
setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
508531
setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
509532
setRawSnapshotWriter(FileRawSnapshotWriter.create(
510-
clusterMetadataDirectory.toPath(),
533+
Paths.get(writeLogDir),
511534
Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
512535
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
513536
setVoterSet(Optional.of(voterSet));

metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.OptionalInt;
5959
import java.util.stream.Stream;
6060

61+
import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION;
6162
import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT;
6263
import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD;
6364
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -265,7 +266,9 @@ public void testFormatWithOlderReleaseVersion() throws Exception {
265266
" with metadata.version " + MetadataVersion.IBP_3_5_IV0 + ".",
266267
formatter1.output().trim());
267268
BootstrapMetadata bootstrapMetadata =
268-
new BootstrapDirectory(testEnv.directory(0)).read();
269+
new BootstrapDirectory(testEnv.directory(0) + String.format("/%s-%d",
270+
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
271+
CLUSTER_METADATA_TOPIC_PARTITION.partition())).read();
269272
assertEquals(MetadataVersion.IBP_3_5_IV0, bootstrapMetadata.metadataVersion());
270273
assertEquals(1, bootstrapMetadata.records().size());
271274
}
@@ -293,11 +296,12 @@ public void testFormatWithUnstableReleaseVersion() throws Exception {
293296
" with metadata.version " + MetadataVersion.latestTesting() + ".",
294297
formatter1.output().trim());
295298
BootstrapMetadata bootstrapMetadata =
296-
new BootstrapDirectory(testEnv.directory(0)).read();
299+
new BootstrapDirectory(testEnv.directory(0) + String.format("/%s-%d",
300+
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
301+
CLUSTER_METADATA_TOPIC_PARTITION.partition())).read();
297302
assertEquals(MetadataVersion.latestTesting(), bootstrapMetadata.metadataVersion());
298303
}
299304
}
300-
301305
@Test
302306
public void testFormattingCreatesLogDirId() throws Exception {
303307
try (TestEnv testEnv = new TestEnv(1)) {
@@ -344,7 +348,9 @@ public void testFormatWithScram() throws Exception {
344348
" with metadata.version " + MetadataVersion.IBP_3_8_IV0 + ".",
345349
formatter1.output().trim());
346350
BootstrapMetadata bootstrapMetadata =
347-
new BootstrapDirectory(testEnv.directory(0)).read();
351+
new BootstrapDirectory(testEnv.directory(0) + String.format("/%s-%d",
352+
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
353+
CLUSTER_METADATA_TOPIC_PARTITION.partition())).read();
348354
assertEquals(MetadataVersion.IBP_3_8_IV0, bootstrapMetadata.metadataVersion());
349355
List<ApiMessageAndVersion> scramRecords = bootstrapMetadata.records().stream().
350356
filter(r -> r.message() instanceof UserScramCredentialRecord).
@@ -379,7 +385,9 @@ public void testFeatureFlag(short version) throws Exception {
379385
formatter1.formatter.setFeatureLevel(TestFeatureVersion.FEATURE_NAME, version);
380386
formatter1.formatter.run();
381387
BootstrapMetadata bootstrapMetadata =
382-
new BootstrapDirectory(testEnv.directory(0)).read();
388+
new BootstrapDirectory(testEnv.directory(0) + String.format("/%s-%d",
389+
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
390+
CLUSTER_METADATA_TOPIC_PARTITION.partition())).read();
383391
List<ApiMessageAndVersion> expected = new ArrayList<>();
384392
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
385393
setName(MetadataVersion.FEATURE_NAME).

0 commit comments

Comments
 (0)