Skip to content

Commit 7e303b9

Browse files
committed
add bootstrap records in formatter
1 parent 026da2b commit 7e303b9

10 files changed

Lines changed: 24 additions & 24 deletions

File tree

core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,6 @@ class DumpLogSegmentsTest {
593593
)
594594

595595
val lastContainedLogTimestamp = 10000
596-
val emptyOptional: Optional[java.util.List[ApiMessageAndVersion]] = Optional.empty()
597596

598597
Using.resource(
599598
new RecordsSnapshotWriter.Builder()
@@ -602,7 +601,7 @@ class DumpLogSegmentsTest {
602601
.setRawSnapshotWriter(metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)).get)
603602
.setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
604603
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true))))
605-
.build(MetadataRecordSerde.INSTANCE, emptyOptional)
604+
.build(MetadataRecordSerde.INSTANCE)
606605
) { snapshotWriter =>
607606
snapshotWriter.append(metadataRecords)
608607
snapshotWriter.freeze()

metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws Exception {
439439
Files.createDirectories(Paths.get(writeLogDir));
440440
writeBoostrapSnapshot(writeLogDir,
441441
bootstrapMetadata,
442-
initialControllers.get(),
442+
initialControllers,
443443
featureLevels.get(KRaftVersion.FEATURE_NAME),
444444
controllerListenerName);
445445
});
@@ -492,24 +492,27 @@ static DirectoryType calculate(
492492
static void writeBoostrapSnapshot(
493493
String writeLogDir,
494494
BootstrapMetadata bootstrapMetadata,
495-
DynamicVoters initialControllers,
495+
Optional<DynamicVoters> initialControllers,
496496
short kraftVersion,
497497
String controllerListenerName
498498
) {
499499
File parentDir = new File(writeLogDir);
500500
File clusterMetadataDirectory = new File(parentDir, String.format("%s-%d",
501501
CLUSTER_METADATA_TOPIC_PARTITION.topic(),
502502
CLUSTER_METADATA_TOPIC_PARTITION.partition()));
503-
VoterSet voterSet = initialControllers.toVoterSet(controllerListenerName);
504503
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder().
505504
setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
506505
setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
507506
setRawSnapshotWriter(FileRawSnapshotWriter.create(
508507
clusterMetadataDirectory.toPath(),
509508
Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
510-
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
511-
setVoterSet(Optional.of(voterSet));
512-
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde(), Optional.of(bootstrapMetadata.records()))) {
509+
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion));
510+
if (initialControllers.isPresent()) {
511+
VoterSet voterSet = initialControllers.get().toVoterSet(controllerListenerName);
512+
builder.setVoterSet(Optional.of(voterSet));
513+
}
514+
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde())) {
515+
writer.append(bootstrapMetadata.records());
513516
writer.freeze();
514517
}
515518
}

metadata/src/test/java/org/apache/kafka/controller/MockRaftClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,7 @@ public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(
729729
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
730730
.setTime(new MockTime())
731731
.setRawSnapshotWriter(createNewSnapshot(snapshotId))
732-
.build(new MetadataRecordSerde(), Optional.empty())
732+
.build(new MetadataRecordSerde())
733733
);
734734
}
735735

raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3726,7 +3726,7 @@ public Optional<SnapshotWriter<T>> createSnapshot(
37263726
.setRawSnapshotWriter(wrappedWriter)
37273727
.setKraftVersion(partitionState.kraftVersionAtOffset(lastContainedLogOffset))
37283728
.setVoterSet(partitionState.voterSetAtOffset(lastContainedLogOffset))
3729-
.build(serde, Optional.empty());
3729+
.build(serde);
37303730
});
37313731
}
37323732

raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public Builder setVoterSet(Optional<VoterSet> voterSet) {
191191
return this;
192192
}
193193

194-
public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde, Optional<List<T>> bootstrapRecords) {
194+
public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {
195195
if (rawSnapshotWriter.isEmpty()) {
196196
throw new IllegalStateException("Builder::build called without a RawSnapshotWriter");
197197
} else if (rawSnapshotWriter.get().sizeInBytes() != 0) {
@@ -213,8 +213,6 @@ public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde, Optional<List<T>
213213
serde
214214
);
215215

216-
bootstrapRecords.ifPresent(writer::append);
217-
218216
writer.accumulator.appendControlMessages((baseOffset, epoch, compression, buffer) -> {
219217
long now = time.milliseconds();
220218
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(

raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2224,7 +2224,7 @@ private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext conte
22242224
return new RecordsSnapshotWriter.Builder()
22252225
.setTime(context.time)
22262226
.setRawSnapshotWriter(snapshot)
2227-
.build(new StringSerde(), Optional.empty());
2227+
.build(new StringSerde());
22282228
}
22292229

22302230
private static final class MemorySnapshotWriter implements RawSnapshotWriter {

raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ Builder withBootstrapSnapshot(Optional<VoterSet> voters) {
363363
.setKraftVersion(kraftVersion)
364364
.setVoterSet(voters);
365365

366-
try (RecordsSnapshotWriter<String> writer = builder.build(SERDE, Optional.empty())) {
366+
try (RecordsSnapshotWriter<String> writer = builder.build(SERDE)) {
367367
writer.freeze();
368368
}
369369
} else {

raft/src/test/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachineTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ void testUpdateWithEmptySnapshot() {
172172
// Create a snapshot that doesn't have any kraft.version or voter set control records
173173
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
174174
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(new OffsetAndEpoch(10, epoch)).get());
175-
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
175+
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
176176
writer.freeze();
177177
}
178178
log.truncateToLatestSnapshot();
@@ -234,7 +234,7 @@ void testUpdateWithSnapshot() {
234234
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(new OffsetAndEpoch(10, epoch)).get())
235235
.setKraftVersion(kraftVersion)
236236
.setVoterSet(Optional.of(voterSet));
237-
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
237+
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
238238
writer.freeze();
239239
}
240240
log.truncateToLatestSnapshot();
@@ -272,7 +272,7 @@ void testUpdateWithSnapshotAndLogOverride() {
272272
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(snapshotId).get())
273273
.setKraftVersion(kraftVersion)
274274
.setVoterSet(Optional.of(snapshotVoterSet));
275-
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
275+
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
276276
writer.freeze();
277277
}
278278
log.truncateToLatestSnapshot();

raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ public void testControlRecordIterationWithKraftVersion0() {
171171
.setRawSnapshotWriter(
172172
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), buffer::set)
173173
);
174-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
174+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
175175
snapshot.append(List.of("a", "b", "c"));
176176
snapshot.append(List.of("d", "e", "f"));
177177
snapshot.append(List.of("g", "h", "i"));
@@ -221,7 +221,7 @@ public void testControlRecordIterationWithKraftVersion1() {
221221
.setRawSnapshotWriter(
222222
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), buffer::set)
223223
);
224-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
224+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
225225
snapshot.append(List.of("a", "b", "c"));
226226
snapshot.append(List.of("d", "e", "f"));
227227
snapshot.append(List.of("g", "h", "i"));

raft/src/test/java/org/apache/kafka/snapshot/RecordsSnapshotWriterTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void testBuilderKRaftVersion0() {
6060
.setRawSnapshotWriter(
6161
new MockRawSnapshotWriter(snapshotId, buffer::set)
6262
);
63-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
63+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
6464
snapshot.freeze();
6565
}
6666

@@ -114,7 +114,7 @@ void testBuilderKRaftVersion0WithVoterSet() {
114114
new MockRawSnapshotWriter(snapshotId, buffer::set)
115115
);
116116

117-
assertThrows(IllegalStateException.class, () -> builder.build(STRING_SERDE, Optional.empty()));
117+
assertThrows(IllegalStateException.class, () -> builder.build(STRING_SERDE));
118118
}
119119

120120
@Test
@@ -133,7 +133,7 @@ void testKBuilderRaftVersion1WithVoterSet() {
133133
.setRawSnapshotWriter(
134134
new MockRawSnapshotWriter(snapshotId, buffer::set)
135135
);
136-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
136+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
137137
snapshot.freeze();
138138
}
139139

@@ -191,7 +191,7 @@ void testBuilderKRaftVersion1WithoutVoterSet() {
191191
.setRawSnapshotWriter(
192192
new MockRawSnapshotWriter(snapshotId, buffer::set)
193193
);
194-
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
194+
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
195195
snapshot.freeze();
196196
}
197197

0 commit comments

Comments
 (0)