diff --git a/build.gradle b/build.gradle index f302df7aea443..7bdb74961a542 100644 --- a/build.gradle +++ b/build.gradle @@ -1432,6 +1432,7 @@ project(':metadata') { testImplementation testLog4j2Libs testFixturesImplementation testFixtures(project(':server-common')) + testFixturesImplementation project(':raft') testFixturesImplementation libs.junitJupiter testImplementation libs.junitJupiter diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala index facd495713311..5d559dd20eacf 100644 --- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala +++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala @@ -17,6 +17,7 @@ package kafka.server import java.io.File +import java.nio.file.Path import java.util.concurrent.CompletableFuture import kafka.utils.{Logging, Mx4jLoader} import org.apache.kafka.common.config.{ConfigDef, ConfigResource} @@ -25,7 +26,7 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.utils.internals.AppInfoParser import org.apache.kafka.common.{KafkaException, Uuid} import org.apache.kafka.metadata.KafkaConfigSchema -import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata} +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble.VerificationFlag.{REQUIRE_AT_LEAST_ONE_VALID, REQUIRE_METADATA_LOG_DIR} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble} import org.apache.kafka.raft.QuorumConfig @@ -182,8 +183,7 @@ object KafkaRaftServer { } // Load the BootstrapMetadata. - val bootstrapDirectory = new BootstrapDirectory(config.metadataLogDir) - val bootstrapMetadata = bootstrapDirectory.read() + val bootstrapMetadata = BootstrapMetadata.fromDirectory(Path.of(config.metadataLogDir)) (metaPropsEnsemble, bootstrapMetadata) } diff --git a/core/src/main/scala/kafka/tools/StorageTool.scala b/core/src/main/scala/kafka/tools/StorageTool.scala index 456425959740e..cba3b3647066f 100644 --- a/core/src/main/scala/kafka/tools/StorageTool.scala +++ b/core/src/main/scala/kafka/tools/StorageTool.scala @@ -125,6 +125,7 @@ object StorageTool extends Logging { val formatter = new Formatter(). setPrintStream(printStream). setNodeId(config.nodeId). + setWriteBootstrapSnapshot(config.processRoles.contains(ProcessRole.ControllerRole)). setClusterId(namespace.getString("cluster_id")). setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled). setIgnoreFormatted(namespace.getBoolean("ignore_formatted")). diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala index a2783cb5015a4..cac002ddfd1b5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala @@ -21,12 +21,13 @@ import java.nio.file.Files import java.util.{Optional, Properties} import org.apache.kafka.common.{KafkaException, Uuid} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.metadata.bootstrap.{BootstrapDirectory, BootstrapMetadata} +import org.apache.kafka.metadata.bootstrap.{BootstrapMetadata, BootstrapTestUtils} import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils} -import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig} +import org.apache.kafka.metadata.storage.Formatter import org.apache.kafka.network.SocketServerConfigs -import org.apache.kafka.server.config.ServerLogConfigs +import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig} import org.apache.kafka.server.common.MetadataVersion +import org.apache.kafka.server.config.ServerLogConfigs import org.apache.kafka.storage.internals.log.UnifiedLog import org.apache.kafka.test.TestUtils import org.junit.jupiter.api.Assertions._ @@ -109,8 +110,13 @@ class KafkaRaftServerTest { } private def writeBootstrapMetadata(logDir: File, metadataVersion: MetadataVersion): Unit = { - val bootstrapDirectory = new BootstrapDirectory(logDir.toString) - bootstrapDirectory.writeBinaryFile(BootstrapMetadata.fromVersion(metadataVersion, "test")) + Formatter.writeBoostrapSnapshot( + logDir.toString, + BootstrapMetadata.fromVersion(metadataVersion, "test"), + Optional.empty(), + 0.toShort, + "CONTROLLER" + ) } @Test @@ -272,20 +278,32 @@ class KafkaRaftServerTest { setDirectoryId(Uuid.fromString("4jm0e-YRYeB6CCKBvwoS8w")). build() - val configProperties = new Properties - configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller") - configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString) - configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093") - configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093") - configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL") + val logDir = TestUtils.tempDirectory() + try { + writeMetaProperties(logDir, metaProperties) + writeBootstrapMetadata(logDir, MetadataVersion.IBP_3_3_IV3) + + val configProperties = new Properties + configProperties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller") + configProperties.put(KRaftConfigs.NODE_ID_CONFIG, nodeId.toString) + configProperties.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9093") + configProperties.put(QuorumConfig.QUORUM_VOTERS_CONFIG, s"$nodeId@localhost:9093") + configProperties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL") + configProperties.put(ServerLogConfigs.LOG_DIR_CONFIG, logDir.getAbsolutePath) + val config = KafkaConfig.fromProps(configProperties) - val (metaPropertiesEnsemble, bootstrapMetadata) = - invokeLoadMetaProperties(metaProperties, configProperties, Some(MetadataVersion.IBP_3_3_IV3)) + val (metaPropertiesEnsemble, _) = + KafkaRaftServer.initializeLogDirs(config, MetaPropertiesEnsemble.LOG, "") - assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next()) - assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty) - assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty) - assertEquals(bootstrapMetadata.metadataVersion(), MetadataVersion.IBP_3_3_IV3) + assertEquals(metaProperties, metaPropertiesEnsemble.logDirProps().values().iterator().next()) + assertTrue(metaPropertiesEnsemble.errorLogDirs().isEmpty) + assertTrue(metaPropertiesEnsemble.emptyLogDirs().isEmpty) + + val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(logDir.getAbsolutePath) + assertEquals(MetadataVersion.IBP_3_3_IV3, bootstrapMetadata.metadataVersion()) + } finally { + Utils.delete(logDir) + } } @Test diff --git a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala index 4ae84c03685ee..f0b6f8f8e028e 100644 --- a/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala +++ b/core/src/test/scala/unit/kafka/tools/StorageToolTest.scala @@ -27,12 +27,12 @@ import kafka.utils.TestUtils import net.sourceforge.argparse4j.inf.ArgumentParserException import org.apache.kafka.common.metadata.UserScramCredentialRecord import org.apache.kafka.common.utils.Utils -import org.apache.kafka.server.common.{Feature, MetadataVersion} -import org.apache.kafka.metadata.bootstrap.BootstrapDirectory +import org.apache.kafka.metadata.bootstrap.BootstrapTestUtils import org.apache.kafka.metadata.properties.{MetaPropertiesEnsemble, PropertiesUtils} import org.apache.kafka.metadata.storage.FormatterException import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.raft.{KRaftConfigs, MetadataLogConfig, QuorumConfig} +import org.apache.kafka.server.common.{Feature, MetadataVersion} import org.apache.kafka.server.config.{ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.util.TerseFailure import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue} @@ -390,15 +390,15 @@ Found problem: def testFormatWithReleaseVersionAndFeatureOverride(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() - properties.putAll(defaultStaticQuorumProperties) + properties.putAll(defaultDynamicQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() assertEquals(0, runFormatCommand(stream, properties, Seq( - "--release-version", "3.7-IV0", + "--release-version", "3.7-IV0", "--standalone", "--feature", "share.version=1"))) // Verify that the feature override is applied by checking the bootstrap metadata - val bootstrapMetadata = new BootstrapDirectory(availableDirs.head.toString).read + val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString) // Verify that the share.version feature is set to 1 as specified assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"), @@ -409,7 +409,7 @@ Found problem: "Failed to find release version in output: " + stream.toString()) // Verify that the format command completed successfully with features - assertTrue(stream.toString().contains("Formatting metadata directory"), + assertTrue(stream.toString().contains("Formatting dynamic metadata voter directory"), "Failed to find formatting message in output: " + stream.toString()) } @@ -417,17 +417,17 @@ Found problem: def testFormatWithMultipleFeatures(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() - properties.putAll(defaultStaticQuorumProperties) + properties.putAll(defaultDynamicQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() assertEquals(0, runFormatCommand(stream, properties, Seq( - "--release-version", "3.8-IV0", + "--release-version", "3.8-IV0", "--standalone", "--feature", "share.version=1", "--feature", "transaction.version=2", "--feature", "group.version=1"))) // Verify that all features are properly bootstrapped by checking the bootstrap metadata - val bootstrapMetadata = new BootstrapDirectory(availableDirs.head.toString).read + val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString) // Verify that all specified features are set correctly assertEquals(1.toShort, bootstrapMetadata.featureLevel("share.version"), @@ -442,7 +442,7 @@ Found problem: "Failed to find release version in output: " + stream.toString()) // Verify that the format command completed successfully with multiple features - assertTrue(stream.toString().contains("Formatting metadata directory"), + assertTrue(stream.toString().contains("Formatting dynamic metadata voter directory"), "Failed to find formatting message in output: " + stream.toString()) } @@ -852,11 +852,11 @@ Found problem: def testBootstrapScramRecords(): Unit = { val availableDirs = Seq(TestUtils.tempDir()) val properties = new Properties() - properties.putAll(defaultStaticQuorumProperties) + properties.putAll(defaultDynamicQuorumProperties) properties.setProperty("log.dirs", availableDirs.mkString(",")) val stream = new ByteArrayOutputStream() val arguments = ListBuffer[String]( - "--release-version", "3.9-IV0", + "--release-version", "3.9-IV0", "--standalone", "--add-scram", "SCRAM-SHA-512=[name=alice,password=changeit]", "--add-scram", "SCRAM-SHA-512=[name=bob,password=changeit]" ) @@ -865,7 +865,7 @@ Found problem: // Not doing full SCRAM record validation since that's covered elsewhere. // Just checking that we generate the correct number of records - val bootstrapMetadata = new BootstrapDirectory(availableDirs.head.toString).read + val bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(availableDirs.head.toString) val scramRecords = bootstrapMetadata.records().asScala .filter(apiMessageAndVersion => apiMessageAndVersion.message().isInstanceOf[UserScramCredentialRecord]) .map(apiMessageAndVersion => apiMessageAndVersion.message().asInstanceOf[UserScramCredentialRecord]) diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 35ad95356c11f..5da6eb2e3aa0b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -128,6 +128,7 @@ import org.slf4j.Logger; +import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; @@ -1075,7 +1076,31 @@ public void handleLoadSnapshot(SnapshotReader reader) { @Override public void handleLoadBootstrap(SnapshotReader reader) { - reader.close(); + appendRaftEvent(String.format("handleLoadBootstrap[snapshotId=%s]", reader.snapshotId()), () -> { + try { + String snapshotName = Snapshots.filenameFromSnapshotId(reader.snapshotId()); + if (isActiveController()) { + throw fatalFaultHandler.handleFault("Asked to load bootstrap snapshot " + snapshotName + + ", but we are the active controller at epoch " + curClaimEpoch); + } + List records = new ArrayList<>(); + while (reader.hasNext()) { + Batch batch = reader.next(); + records.addAll(batch.records()); + } + if (!records.isEmpty()) { + log.debug("Loaded {} bootstrap records from {}", records.size(), snapshotName); + bootstrapMetadata = BootstrapMetadata.fromRecords(records, "bootstrap"); + } + } catch (FaultHandlerException e) { + throw e; + } catch (Throwable e) { + throw fatalFaultHandler.handleFault("Error while loading bootstrap snapshot " + + reader.snapshotId(), e); + } finally { + reader.close(); + } + }); } @Override @@ -1460,7 +1485,7 @@ private void replay(ApiMessage message, Optional snapshotId, lon /** * The bootstrap metadata to use for initialization if needed. */ - private final BootstrapMetadata bootstrapMetadata; + private BootstrapMetadata bootstrapMetadata; /** * The maximum number of records per batch to allow. diff --git a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java deleted file mode 100644 index dbeaeaa652428..0000000000000 --- a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectory.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.metadata.bootstrap; - -import org.apache.kafka.metadata.util.BatchFileReader; -import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType; -import org.apache.kafka.metadata.util.BatchFileWriter; -import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.server.common.MetadataVersion; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Objects; - -import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; -import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; - -/** - * A read-only class that holds the controller bootstrap metadata. A file named "bootstrap.checkpoint" is used and the - * format is the same as a KRaft snapshot. - */ -public class BootstrapDirectory { - public static final String BINARY_BOOTSTRAP_FILENAME = "bootstrap.checkpoint"; - - private final String directoryPath; - - /** - * Create a new BootstrapDirectory object. - * - * @param directoryPath The path to the directory with the bootstrap file. - */ - public BootstrapDirectory( - String directoryPath - ) { - this.directoryPath = Objects.requireNonNull(directoryPath); - } - - public BootstrapMetadata read() throws Exception { - Path path = Paths.get(directoryPath); - if (!Files.isDirectory(path)) { - if (Files.exists(path)) { - throw new RuntimeException("Path " + directoryPath + " exists, but is not " + - "a directory."); - } else { - throw new RuntimeException("No such directory as " + directoryPath); - } - } - Path binaryBootstrapPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME); - if (!Files.exists(binaryBootstrapPath)) { - return readFromConfiguration(); - } else { - return readFromBinaryFile(binaryBootstrapPath.toString()); - } - } - - BootstrapMetadata readFromConfiguration() { - return BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), "the default bootstrap"); - } - - BootstrapMetadata readFromBinaryFile(String binaryPath) throws Exception { - List records = new ArrayList<>(); - try (BatchFileReader reader = new BatchFileReader.Builder(). - setPath(binaryPath).build()) { - while (reader.hasNext()) { - BatchAndType batchAndType = reader.next(); - if (!batchAndType.isControl()) { - records.addAll(batchAndType.batch().records()); - } - } - } - return BootstrapMetadata.fromRecords(Collections.unmodifiableList(records), - "the binary bootstrap metadata file: " + binaryPath); - } - - public void writeBinaryFile(BootstrapMetadata bootstrapMetadata) throws IOException { - if (!Files.isDirectory(Paths.get(directoryPath))) { - throw new RuntimeException("No such directory as " + directoryPath); - } - Path tempPath = Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME + ".tmp"); - Files.deleteIfExists(tempPath); - try { - try (BatchFileWriter writer = BatchFileWriter.open(tempPath)) { - for (ApiMessageAndVersion message : bootstrapMetadata.records()) { - writer.append(message); - } - } - - Files.move( - tempPath, - Paths.get(directoryPath, BINARY_BOOTSTRAP_FILENAME), - ATOMIC_MOVE, REPLACE_EXISTING - ); - } finally { - Files.deleteIfExists(tempPath); - } - } -} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java index 9438a76693efd..82f607f56894f 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadata.java @@ -19,11 +19,19 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.metadata.util.BatchFileReader; +import org.apache.kafka.metadata.util.BatchFileReader.BatchAndType; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.server.common.MetadataVersion; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -35,10 +43,61 @@ * use the version specified here. */ public class BootstrapMetadata { + public static final String BINARY_BOOTSTRAP_FILENAME = "bootstrap.checkpoint"; + private final List records; private final short metadataVersionLevel; private final String source; + /** + * Reads bootstrap metadata from the given directory. Checks the legacy bootstrap.checkpoint + * first and falls back to defaults if it does not exist. + */ + public static BootstrapMetadata fromDirectory(Path directory) { + if (!Files.isDirectory(directory)) { + if (Files.exists(directory)) { + throw new IllegalStateException("Path " + directory + " exists, but is not " + + "a directory."); + } else { + throw new IllegalStateException("No such directory as " + directory); + } + } + Path binaryBootstrapPath = directory.resolve(BINARY_BOOTSTRAP_FILENAME); + if (Files.exists(binaryBootstrapPath)) { + return fromCheckpointFile(binaryBootstrapPath); + } + return fromVersion(MetadataVersion.latestProduction(), "the default bootstrap"); + } + + /** + * Reads bootstrap metadata from the given checkpoint file. + * Throws if the file does not exist. + */ + public static BootstrapMetadata fromCheckpointFile(Path file) { + if (!Files.exists(file)) { + String path = file.toString(); + throw new UncheckedIOException(path, new FileNotFoundException(path)); + } + return readFromBinaryFile(file); + } + + private static BootstrapMetadata readFromBinaryFile(Path binaryPath) { + List records = new ArrayList<>(); + try (BatchFileReader reader = new BatchFileReader.Builder(). + setPath(binaryPath.toString()).build()) { + while (reader.hasNext()) { + BatchAndType batchAndType = reader.next(); + if (!batchAndType.isControl()) { + records.addAll(batchAndType.batch().records()); + } + } + } catch (IOException e) { + throw new UncheckedIOException("Unable to read bootstrap metadata from " + binaryPath, e); + } + return fromRecords(Collections.unmodifiableList(records), + "the binary bootstrap metadata file: " + binaryPath); + } + public static BootstrapMetadata fromVersions( MetadataVersion metadataVersion, Map featureVersions, @@ -92,7 +151,7 @@ public static BootstrapMetadata fromRecords(List records, return new BootstrapMetadata(records, metadataVersionLevel.get(), source); } - public static Optional recordToMetadataVersionLevel(ApiMessage record) { + private static Optional recordToMetadataVersionLevel(ApiMessage record) { if (record instanceof FeatureLevelRecord featureLevel) { if (featureLevel.name().equals(MetadataVersion.FEATURE_NAME)) { return Optional.of(featureLevel.featureLevel()); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java index 1ccf0c3d5c05a..8c163cd470008 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/storage/Formatter.java @@ -20,14 +20,12 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.utils.Time; import org.apache.kafka.metadata.MetadataRecordSerde; -import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.metadata.properties.MetaProperties; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.metadata.properties.MetaPropertiesVersion; import org.apache.kafka.raft.DynamicVoters; import org.apache.kafka.raft.KafkaRaftClient; -import org.apache.kafka.raft.VoterSet; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.Feature; import org.apache.kafka.server.common.FeatureVersion; @@ -102,6 +100,11 @@ public class Formatter { */ private BootstrapMetadata bootstrapMetadata; + /** + * Additional bootstrap records to include beyond feature levels and SCRAM. + */ + private List additionalBootstrapRecords = List.of(); + /** * True if we should enable unstable feature versions. */ @@ -132,6 +135,7 @@ public class Formatter { */ private Optional initialControllers = Optional.empty(); private boolean hasDynamicQuorum = false; + private boolean writeBootstrapSnapshot = true; public Formatter setPrintStream(PrintStream printStream) { this.printStream = printStream; @@ -182,6 +186,11 @@ public Formatter setFeatureLevel(String featureName, Short level) { return this; } + public Formatter setAdditionalBootstrapRecords(List additionalBootstrapRecords) { + this.additionalBootstrapRecords = additionalBootstrapRecords; + return this; + } + public Formatter setUnstableFeatureVersionsEnabled(boolean unstableFeatureVersionsEnabled) { this.unstableFeatureVersionsEnabled = unstableFeatureVersionsEnabled; return this; @@ -217,6 +226,11 @@ public Formatter setHasDynamicQuorum(boolean hasDynamicQuorum) { return this; } + public Formatter setWriteBootstrapSnapshot(boolean writeBootstrapSnapshot) { + this.writeBootstrapSnapshot = writeBootstrapSnapshot; + return this; + } + public Optional initialVoters() { return initialControllers; } @@ -378,6 +392,9 @@ BootstrapMetadata calculateBootstrapMetadata() throws Exception { } bootstrapRecords.addAll(ScramParser.parse(scramArguments)); } + if (!additionalBootstrapRecords.isEmpty()) { + bootstrapRecords.addAll(additionalBootstrapRecords); + } return BootstrapMetadata.fromRecords(bootstrapRecords, "format command"); } @@ -432,11 +449,10 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws Exception { directoryTypes.get(writeLogDir).description(), writeLogDir, MetadataVersion.FEATURE_NAME, releaseVersion); Files.createDirectories(Paths.get(writeLogDir)); - BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(writeLogDir); - bootstrapDirectory.writeBinaryFile(bootstrapMetadata); - if (directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) { - writeDynamicQuorumSnapshot(writeLogDir, - initialControllers.get(), + if (writeBootstrapSnapshot) { + writeBoostrapSnapshot(writeLogDir, + bootstrapMetadata, + initialControllers, featureLevels.get(KRaftVersion.FEATURE_NAME), controllerListenerName); } @@ -487,9 +503,10 @@ static DirectoryType calculate( } } - static void writeDynamicQuorumSnapshot( + public static void writeBoostrapSnapshot( String writeLogDir, - DynamicVoters initialControllers, + BootstrapMetadata bootstrapMetadata, + Optional initialControllers, short kraftVersion, String controllerListenerName ) { @@ -497,7 +514,6 @@ static void writeDynamicQuorumSnapshot( File clusterMetadataDirectory = new File(parentDir, String.format("%s-%d", CLUSTER_METADATA_TOPIC_PARTITION.topic(), CLUSTER_METADATA_TOPIC_PARTITION.partition())); - VoterSet voterSet = initialControllers.toVoterSet(controllerListenerName); RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder(). setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()). setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES). @@ -505,8 +521,10 @@ static void writeDynamicQuorumSnapshot( clusterMetadataDirectory.toPath(), Snapshots.BOOTSTRAP_SNAPSHOT_ID)). setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)). - setVoterSet(Optional.of(voterSet)); + setVoterSet(initialControllers.map(controllers -> controllers.toVoterSet(controllerListenerName))); + try (RecordsSnapshotWriter writer = builder.build(MetadataRecordSerde.INSTANCE)) { + writer.append(bootstrapMetadata.records()); writer.freeze(); } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java index 71957dce6cf0f..70e5b8b8920db 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileReader.java @@ -17,9 +17,11 @@ package org.apache.kafka.metadata.util; +import org.apache.kafka.common.message.KRaftVersionRecord; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.SnapshotFooterRecord; import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.VotersRecord; import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.record.internal.ControlRecordType; import org.apache.kafka.common.record.internal.FileLogInputStream.FileChannelRecordBatch; @@ -34,6 +36,7 @@ import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; @@ -55,7 +58,7 @@ public Builder setPath(String path) { return this; } - public BatchFileReader build() throws Exception { + public BatchFileReader build() throws IOException { if (path == null) { throw new RuntimeException("You must specify a path."); } @@ -119,6 +122,17 @@ private BatchAndType nextControlBatch(FileChannelRecordBatch input) { messages.add(new ApiMessageAndVersion(message, (short) 0)); break; } + case KRAFT_VERSION: { + KRaftVersionRecord message = new KRaftVersionRecord(); + message.read(new ByteBufferAccessor(record.value()), (short) 0); + messages.add(new ApiMessageAndVersion(message, (short) 0)); + break; + } + case KRAFT_VOTERS: + VotersRecord message = new VotersRecord(); + message.read(new ByteBufferAccessor(record.value()), (short) 0); + messages.add(new ApiMessageAndVersion(message, (short) 0)); + break; default: throw new RuntimeException("Unsupported control record type " + type + " at offset " + record.offset()); diff --git a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java deleted file mode 100644 index 97240b227bb24..0000000000000 --- a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapDirectoryTest.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.metadata.bootstrap; - -import org.apache.kafka.common.metadata.FeatureLevelRecord; -import org.apache.kafka.common.metadata.NoOpRecord; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.server.common.ApiMessageAndVersion; -import org.apache.kafka.server.common.MetadataVersion; -import org.apache.kafka.test.TestUtils; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.Timeout; - -import java.io.File; -import java.util.List; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - - -@Timeout(40) -public class BootstrapDirectoryTest { - static final List SAMPLE_RECORDS1 = List.of( - new ApiMessageAndVersion(new FeatureLevelRecord(). - setName(MetadataVersion.FEATURE_NAME). - setFeatureLevel((short) 7), (short) 0), - new ApiMessageAndVersion(new NoOpRecord(), (short) 0), - new ApiMessageAndVersion(new NoOpRecord(), (short) 0)); - - static class BootstrapTestDirectory implements AutoCloseable { - File directory = null; - - synchronized BootstrapTestDirectory createDirectory() { - directory = TestUtils.tempDirectory("BootstrapTestDirectory"); - return this; - } - - synchronized String path() { - return directory.getAbsolutePath(); - } - - synchronized String binaryBootstrapPath() { - return new File(directory, BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME).getAbsolutePath(); - } - - @Override - public synchronized void close() throws Exception { - if (directory != null) { - Utils.delete(directory); - } - directory = null; - } - } - - @Test - public void testReadFromEmptyConfiguration() throws Exception { - try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) { - assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), - "the default bootstrap"), - new BootstrapDirectory(testDirectory.path()).read()); - } - } - - @Test - public void testMissingDirectory() { - assertEquals("No such directory as ./non/existent/directory", - assertThrows(RuntimeException.class, () -> - new BootstrapDirectory("./non/existent/directory").read()).getMessage()); - } - - @Test - public void testReadFromConfigurationFile() throws Exception { - try (BootstrapTestDirectory testDirectory = new BootstrapTestDirectory().createDirectory()) { - BootstrapDirectory directory = new BootstrapDirectory(testDirectory.path()); - BootstrapMetadata metadata = BootstrapMetadata.fromRecords(SAMPLE_RECORDS1, - "the binary bootstrap metadata file: " + testDirectory.binaryBootstrapPath()); - directory.writeBinaryFile(metadata); - assertEquals(metadata, directory.read()); - } - } -} diff --git a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java index 0ad45185eda57..23fbada300787 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/bootstrap/BootstrapMetadataTest.java @@ -19,14 +19,21 @@ import org.apache.kafka.common.metadata.FeatureLevelRecord; import org.apache.kafka.common.metadata.NoOpRecord; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.util.BatchFileWriter; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.common.MetadataVersionTestUtils; +import org.apache.kafka.test.TestUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.io.UncheckedIOException; +import java.nio.file.Path; import java.util.List; +import java.util.Map; +import java.util.TreeMap; import static org.apache.kafka.server.common.MetadataVersion.FEATURE_NAME; import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV3; @@ -45,6 +52,31 @@ public class BootstrapMetadataTest { setName(FEATURE_NAME). setFeatureLevel((short) 7), (short) 0)); + static class BootstrapTestDirectory implements AutoCloseable { + private final Path directory; + + static BootstrapTestDirectory createDirectory() { + return new BootstrapTestDirectory(); + } + + private BootstrapTestDirectory() { + this.directory = TestUtils.tempDirectory().toPath(); + } + + Path path() { + return directory; + } + + Path binaryBootstrapPath() { + return directory.resolve(BootstrapMetadata.BINARY_BOOTSTRAP_FILENAME); + } + + @Override + public void close() throws Exception { + Utils.delete(directory.toFile()); + } + } + @Test public void testFromVersion() { assertEquals(new BootstrapMetadata(List.of( @@ -133,4 +165,73 @@ public void testFromRecordsListWithOldMetadataVersion() { + " to " + MetadataVersion.latestTesting().featureLevel() + ".", assertThrows(RuntimeException.class, bootstrapMetadata::metadataVersion).getMessage()); } + + @Test + public void testReadFromEmptyDirectory() throws Exception { + try (BootstrapTestDirectory testDirectory = BootstrapTestDirectory.createDirectory()) { + assertEquals(BootstrapMetadata.fromVersion(MetadataVersion.latestProduction(), + "the default bootstrap"), + BootstrapMetadata.fromDirectory(testDirectory.path())); + } + } + + @Test + public void testMissingDirectory() { + assertEquals("No such directory as ./non/existent/directory", + assertThrows(IllegalStateException.class, () -> + BootstrapMetadata.fromDirectory(Path.of("./non/existent/directory"))).getMessage()); + } + + @Test + public void testFromDirectoryWithLegacyBootstrapCheckpoint() throws Exception { + try (BootstrapTestDirectory testDirectory = BootstrapTestDirectory.createDirectory()) { + Path checkpointPath = testDirectory.binaryBootstrapPath(); + BootstrapMetadata expected = BootstrapMetadata.fromVersion(IBP_3_3_IV3, "test"); + try (BatchFileWriter writer = BatchFileWriter.open(checkpointPath)) { + writer.append(expected.records()); + } + BootstrapMetadata result = BootstrapMetadata.fromDirectory(testDirectory.path()); + assertEquals(expected.records(), result.records()); + } + } + + @Test + public void testFromCheckpointFileNotFound() { + Path nonExistent = Path.of("/tmp/does_not_exist_bootstrap.checkpoint"); + assertThrows(UncheckedIOException.class, + () -> BootstrapMetadata.fromCheckpointFile(nonExistent)); + } + + @Test + public void testFromVersions() { + Map features = new TreeMap<>(); + features.put("foo", (short) 2); + features.put("bar", (short) 1); + BootstrapMetadata bm = BootstrapMetadata.fromVersions(MetadataVersion.latestProduction(), features, "test"); + assertEquals(List.of( + new ApiMessageAndVersion(new FeatureLevelRecord() + .setName(FEATURE_NAME) + .setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), (short) 0), + new ApiMessageAndVersion(new FeatureLevelRecord() + .setName("bar") + .setFeatureLevel((short) 1), (short) 0), + new ApiMessageAndVersion(new FeatureLevelRecord() + .setName("foo") + .setFeatureLevel((short) 2), (short) 0)), + bm.records() + ); + } + + @Test + public void testFromVersionsExcludesZeroLevelFeatures() { + Map features = Map.of("foo", (short) 0); + BootstrapMetadata bm = BootstrapMetadata.fromVersions(MetadataVersion.latestProduction(), features, "test"); + assertEquals(List.of( + new ApiMessageAndVersion(new FeatureLevelRecord() + .setName(FEATURE_NAME) + .setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), (short) 0)), + bm.records() + ); + } + } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index dcfba4328aeb7..975c4db0e4851 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -23,8 +23,8 @@ import org.apache.kafka.common.security.scram.internals.ScramFormatter; import org.apache.kafka.common.security.scram.internals.ScramMechanism; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; +import org.apache.kafka.metadata.bootstrap.BootstrapTestUtils; import org.apache.kafka.metadata.properties.MetaProperties; import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble; import org.apache.kafka.raft.DynamicVoters; @@ -60,6 +60,7 @@ import java.util.OptionalInt; import java.util.stream.Stream; +import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION; import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT; import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @@ -148,12 +149,24 @@ public void testDirectories(int numDirs) throws Exception { assertEquals(OptionalInt.of(DEFAULT_NODE_ID), ensemble.nodeId()); assertEquals(Optional.of(DEFAULT_CLUSTER_ID.toString()), ensemble.clusterId()); assertEquals(new HashSet<>(testEnv.directories), ensemble.logDirProps().keySet()); - BootstrapMetadata bootstrapMetadata = - new BootstrapDirectory(testEnv.directory(0)).read(); + BootstrapMetadata bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0)); assertEquals(MetadataVersion.latestProduction(), bootstrapMetadata.metadataVersion()); } } + @Test + public void testSkipsBootstrapSnapshotWhenDisabled() throws Exception { + try (TestEnv testEnv = new TestEnv(1)) { + FormatterContext context = testEnv.newFormatter(); + context.formatter.setWriteBootstrapSnapshot(false); + context.formatter.run(); + File clusterMetadataDir = new File(testEnv.directory(0), String.format("%s-%d", + CLUSTER_METADATA_TOPIC_PARTITION.topic(), + CLUSTER_METADATA_TOPIC_PARTITION.partition())); + assertFalse(clusterMetadataDir.exists()); + } + } + @Test public void testFormatterFailsOnAlreadyFormatted() throws Exception { try (TestEnv testEnv = new TestEnv(1)) { @@ -170,11 +183,7 @@ public void testFormatterFailsOnUnwritableDirectory() throws Exception { try (TestEnv testEnv = new TestEnv(1)) { new File(testEnv.directory(0)).setReadOnly(); FormatterContext formatter1 = testEnv.newFormatter(); - String expectedPrefix = "Error while writing meta.properties file"; - assertEquals(expectedPrefix, - assertThrows(FormatterException.class, - formatter1.formatter::run). - getMessage().substring(0, expectedPrefix.length())); + assertThrows(Exception.class, formatter1.formatter::run); } } @@ -266,8 +275,7 @@ public void testFormatWithOlderReleaseVersion() throws Exception { "\nFormatting metadata directory " + testEnv.directory(0) + " with metadata.version " + MetadataVersion.IBP_3_5_IV0 + ".", formatter1.output().trim()); - BootstrapMetadata bootstrapMetadata = - new BootstrapDirectory(testEnv.directory(0)).read(); + BootstrapMetadata bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0)); assertEquals(MetadataVersion.IBP_3_5_IV0, bootstrapMetadata.metadataVersion()); assertEquals(1, bootstrapMetadata.records().size()); } @@ -294,8 +302,7 @@ public void testFormatWithUnstableReleaseVersion() throws Exception { "\nFormatting metadata directory " + testEnv.directory(0) + " with metadata.version " + MetadataVersion.latestTesting() + ".", formatter1.output().trim()); - BootstrapMetadata bootstrapMetadata = - new BootstrapDirectory(testEnv.directory(0)).read(); + BootstrapMetadata bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0)); assertEquals(MetadataVersion.latestTesting(), bootstrapMetadata.metadataVersion()); } } @@ -345,8 +352,7 @@ public void testFormatWithScram() throws Exception { "\nFormatting metadata directory " + testEnv.directory(0) + " with metadata.version " + MetadataVersion.IBP_3_8_IV0 + ".", formatter1.output().trim()); - BootstrapMetadata bootstrapMetadata = - new BootstrapDirectory(testEnv.directory(0)).read(); + BootstrapMetadata bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0)); assertEquals(MetadataVersion.IBP_3_8_IV0, bootstrapMetadata.metadataVersion()); List scramRecords = bootstrapMetadata.records().stream(). filter(r -> r.message() instanceof UserScramCredentialRecord). @@ -380,8 +386,7 @@ public void testFeatureFlag(short version) throws Exception { formatter1.formatter.setSupportedFeatures(Feature.TEST_AND_PRODUCTION_FEATURES); formatter1.formatter.setFeatureLevel(TestFeatureVersion.FEATURE_NAME, version); formatter1.formatter.run(); - BootstrapMetadata bootstrapMetadata = - new BootstrapDirectory(testEnv.directory(0)).read(); + BootstrapMetadata bootstrapMetadata = BootstrapTestUtils.readBootstrapMetadata(testEnv.directory(0)); List expected = new ArrayList<>(); expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). setName(MetadataVersion.FEATURE_NAME). diff --git a/metadata/src/testFixtures/java/org/apache/kafka/metadata/bootstrap/BootstrapTestUtils.java b/metadata/src/testFixtures/java/org/apache/kafka/metadata/bootstrap/BootstrapTestUtils.java new file mode 100644 index 0000000000000..8ad7e64148d01 --- /dev/null +++ b/metadata/src/testFixtures/java/org/apache/kafka/metadata/bootstrap/BootstrapTestUtils.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.bootstrap; + +import java.nio.file.Path; + +import static org.apache.kafka.common.internals.Topic.CLUSTER_METADATA_TOPIC_PARTITION; +import static org.apache.kafka.snapshot.Snapshots.BOOTSTRAP_SNAPSHOT_ID; +import static org.apache.kafka.snapshot.Snapshots.snapshotPath; + +/** + * Utilities for testing classes that deal with bootstrap metadata. + */ +public class BootstrapTestUtils { + /** + * Reads bootstrap metadata from the cluster metadata bootstrap checkpoint file of the given metadata directory. + * + * @param directoryPath the metadata log directory + * @return the bootstrap metadata + */ + public static BootstrapMetadata readBootstrapMetadata(String directoryPath) { + Path metadataPartitionDir = Path.of(directoryPath, + CLUSTER_METADATA_TOPIC_PARTITION.topic() + "-" + CLUSTER_METADATA_TOPIC_PARTITION.partition()); + Path checkpointPath = snapshotPath(metadataPartitionDir, BOOTSTRAP_SNAPSHOT_ID); + return BootstrapMetadata.fromCheckpointFile(checkpointPath); + } +} diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java index fa498e190ea6f..9fd6404c89380 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java @@ -31,6 +31,8 @@ import org.apache.kafka.common.Uuid; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.internals.BrokerSecurityConfigs; +import org.apache.kafka.common.metadata.FeatureLevelRecord; +import org.apache.kafka.common.metadata.UserScramCredentialRecord; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; @@ -48,6 +50,7 @@ import org.apache.kafka.raft.MetadataLogConfig; import org.apache.kafka.raft.QuorumConfig; import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.config.ServerConfigs; import org.apache.kafka.server.fault.FaultHandler; import org.apache.kafka.storage.internals.log.CleanerConfig; @@ -496,6 +499,25 @@ private void formatNode( formatter.setIgnoreFormatted(false); formatter.setControllerListenerName(controllerListenerName); formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get()); + + List additionalRecords = new ArrayList<>(); + for (ApiMessageAndVersion record : nodes.bootstrapMetadata().records()) { + if (record.message() instanceof FeatureLevelRecord featureRecord) { + if (!featureRecord.name().equals(MetadataVersion.FEATURE_NAME)) { + formatter.setFeatureLevel(featureRecord.name(), featureRecord.featureLevel()); + } + } else if (!(record.message() instanceof UserScramCredentialRecord)) { + additionalRecords.add(record); + } else { + throw new IllegalStateException("UserScramCredentialRecord is not supported in " + + "bootstrap metadata. Use Formatter.setScramArguments() instead."); + } + } + for (String disabledFeature : nodes.disabledFeatures()) { + formatter.setFeatureLevel(disabledFeature, (short) 0); + } + formatter.setAdditionalBootstrapRecords(additionalRecords); + StringBuilder dynamicVotersBuilder = new StringBuilder(); String prefix = ""; if (standalone) { diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java index 0de47d9b77130..b1c7fc00e882d 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.stream.Collectors; @@ -54,6 +55,7 @@ public static class Builder { private int numDisksPerBroker = 1; private Map> perServerProperties = Map.of(); private BootstrapMetadata bootstrapMetadata; + private Set disabledFeatures = Set.of(); public Builder() { this(BootstrapMetadata.fromVersions( @@ -93,6 +95,11 @@ public Builder setBootstrapMetadata(BootstrapMetadata bootstrapMetadata) { return this; } + public Builder setDisabledFeatures(Set disabledFeatures) { + this.disabledFeatures = Collections.unmodifiableSet(disabledFeatures); + return this; + } + public Builder setCombined(boolean combined) { this.combined = combined; return this; @@ -215,14 +222,15 @@ public TestKitNodes build() { brokerNodes.put(id, brokerNode); } - return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), clusterId, bootstrapMetadata, controllerNodes, brokerNodes, - brokerListenerName, brokerSecurityProtocol, controllerListenerName, controllerSecurityProtocol); + return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), clusterId, bootstrapMetadata, disabledFeatures, controllerNodes, + brokerNodes, brokerListenerName, brokerSecurityProtocol, controllerListenerName, controllerSecurityProtocol); } } private final String baseDirectory; private final String clusterId; private final BootstrapMetadata bootstrapMetadata; + private final Set disabledFeatures; private final SortedMap controllerNodes; private final SortedMap brokerNodes; private final ListenerName brokerListenerName; @@ -234,6 +242,7 @@ private TestKitNodes( String baseDirectory, String clusterId, BootstrapMetadata bootstrapMetadata, + Set disabledFeatures, SortedMap controllerNodes, SortedMap brokerNodes, ListenerName brokerListenerName, @@ -244,6 +253,7 @@ private TestKitNodes( this.baseDirectory = Objects.requireNonNull(baseDirectory); this.clusterId = Objects.requireNonNull(clusterId); this.bootstrapMetadata = Objects.requireNonNull(bootstrapMetadata); + this.disabledFeatures = Objects.requireNonNull(disabledFeatures); this.controllerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(controllerNodes))); this.brokerNodes = Collections.unmodifiableSortedMap(new TreeMap<>(Objects.requireNonNull(brokerNodes))); this.brokerListenerName = Objects.requireNonNull(brokerListenerName); @@ -272,6 +282,10 @@ public BootstrapMetadata bootstrapMetadata() { return bootstrapMetadata; } + public Set disabledFeatures() { + return disabledFeatures; + } + public SortedMap brokerNodes() { return brokerNodes; } diff --git a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java index f1ad8080cb44d..ba9da8c77bff4 100644 --- a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java +++ b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java @@ -336,8 +336,15 @@ public void format() throws Exception { } } + Set disabledFeatures = newFeatureLevels.entrySet().stream() + .filter(featureEntry -> featureEntry.getValue() == 0) + .filter(featureEntry -> !featureEntry.getKey().equals(MetadataVersion.FEATURE_NAME)) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + TestKitNodes nodes = new TestKitNodes.Builder() .setBootstrapMetadata(BootstrapMetadata.fromVersions(clusterConfig.metadataVersion(), newFeatureLevels, "testkit")) + .setDisabledFeatures(disabledFeatures) .setCombined(isCombined) .setNumBrokerNodes(clusterConfig.numBrokers()) .setNumDisksPerBroker(clusterConfig.numDisksPerBroker()) diff --git a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java index 0ac0b74b6bcdb..8e6f3d8d09607 100644 --- a/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java +++ b/tools/src/main/java/org/apache/kafka/tools/DumpLogSegments.java @@ -54,7 +54,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde; import org.apache.kafka.coordinator.transaction.TransactionCoordinatorRecordSerde; import org.apache.kafka.metadata.MetadataRecordSerde; -import org.apache.kafka.metadata.bootstrap.BootstrapDirectory; +import org.apache.kafka.metadata.bootstrap.BootstrapMetadata; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; import org.apache.kafka.server.util.CommandDefaultOptions; @@ -365,7 +365,7 @@ private static void dumpLog(File file, long startOffset = Long.parseLong(file.getName().split("\\.")[0]); System.out.println("Log starting offset: " + startOffset); } else if (file.getName().endsWith(Snapshots.SUFFIX)) { - if (file.getName().equals(BootstrapDirectory.BINARY_BOOTSTRAP_FILENAME)) { + if (file.getName().equals(BootstrapMetadata.BINARY_BOOTSTRAP_FILENAME)) { System.out.println("KRaft bootstrap snapshot"); } else { Optional pathOpt = Snapshots.parse(file.toPath());