diff --git a/README.md b/README.md index a226118d8..ec06bb36b 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,8 @@ Cruise Control for Apache Kafka `2.6` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.11+`), `2.7` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.36+`), `2.8` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.66+`), `3.0` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.85+`), `3.1` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.85+`), `3.8` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.142+`), - `3.9` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.143+`), and `4.0` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.144+`) + `3.9` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.143+`), `4.0` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.144+`), + and `4.1.1` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.146+`) * The `migrate_to_kafka_2_4` branch of Cruise Control is compatible with Apache Kafka `2.4` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.4.*`). * The `kafka_2_0_to_2_3` branch (deprecated) of Cruise Control is compatible with Apache Kafka `2.0`, `2.1`, `2.2`, and `2.3` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.0.*`). * The `kafka_0_11_and_1_0` branch (deprecated) of Cruise Control is compatible with Apache Kafka `0.11.0.0`, `1.0`, and `1.1` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `0.1.*`). diff --git a/build.gradle b/build.gradle index 93dcbb6d0..1d3d6eb1f 100644 --- a/build.gradle +++ b/build.gradle @@ -319,12 +319,6 @@ project(':cruise-control') { api 'org.json:json:20231013' api 'org.xerial.snappy:snappy-java:1.1.10.5' - constraints { - implementation("commons-beanutils:commons-beanutils:1.11.0") { - because("version 1.9.4 pulled from kafa 3.9.1 has CVE-2025-48734 in it, which is fixed in 1.11.0") - } - } - testImplementation project(path: ':cruise-control-metrics-reporter', configuration: 'testOutput') testImplementation project(path: ':cruise-control-core', configuration: 'testOutput') testImplementation "org.scala-lang:scala-library:$scalaVersion" diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java index 4cc364e71..0fd0d135c 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java @@ -17,7 +17,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import org.apache.kafka.clients.CommonClientConfigs; @@ -50,7 +49,7 @@ import static org.junit.Assert.assertTrue; public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness { - private static final int NUM_OF_BROKERS = 2; + private static final int NUM_OF_BROKERS = 3; protected static final String TOPIC = "CruiseControlMetricsReporterTest"; protected static final String HOST = "127.0.0.1"; protected CCContainerizedKraftCluster _cluster; @@ -115,6 +114,11 @@ public Properties overridingProps() { return props; } + @Override + protected int clusterSize() { + return NUM_OF_BROKERS; + } + @Test public void testReportingMetrics() { Properties props = new Properties(); @@ -201,7 +205,7 @@ public void testReportingMetrics() { } @Test - public void testUpdatingMetricsTopicConfig() throws InterruptedException, TimeoutException { + public void testUpdatingMetricsTopicConfig() { TopicDescription topicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30), td -> true); assertEquals(1, topicDescription.partitions().size()); diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java index 707415dd9..641ce36c3 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCContainerizedKraftCluster.java @@ -55,7 +55,7 @@ * */ public class CCContainerizedKraftCluster implements Startable { - private static final String KAFKA_IMAGE = System.getenv().getOrDefault("KAFKA_IMAGE", "apache/kafka:3.9.1"); + private static final String KAFKA_IMAGE = System.getenv().getOrDefault("KAFKA_IMAGE", "apache/kafka:4.1.1"); /** * Determines the hostname used by containers to connect to services running on the host machine. * Required for CI environments like CircleCI, where the Docker executor relies on a specific hostname @@ -156,6 +156,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) { .withNetwork(NETWORK) .withNetworkAliases(networkAlias) .withExposedPorts(CONTAINER_EXTERNAL_LISTENER_PORT) + .withCreateContainerCmdModifier(cmd -> cmd.withUser("root")) .withEnv("CLUSTER_ID", clusterId) // Uncomment the following line when debugging Kafka cluster problems. //.withLogConsumer(outputFrame -> System.out.print(networkAlias + " | " + outputFrame.getUtf8String())) diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java index a065f291b..21fdea8e9 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaRaftServer.CLUSTER_ID_CONFIG; +import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.KafkaServerConfigs.METADATA_LOG_DIR_CONFIG; public class CCEmbeddedBroker implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(CCEmbeddedBroker.class); @@ -60,7 +61,7 @@ public CCEmbeddedBroker(Map config) { private void parseConfigs(Map config) { readLogDirs(config); _id = Integer.parseInt((String) config.get(KRaftConfigs.NODE_ID_CONFIG)); - _metadataLogDir = new File((String) config.get(KRaftConfigs.METADATA_LOG_DIR_CONFIG)); + _metadataLogDir = new File((String) config.get(METADATA_LOG_DIR_CONFIG)); // Bind addresses String listenersString = (String) config.get(SocketServerConfigs.LISTENERS_CONFIG); diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java index 323324f58..7257cad03 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBrokerBuilder.java @@ -21,6 +21,7 @@ import org.apache.kafka.storage.internals.log.CleanerConfig; import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaRaftServer.CLUSTER_ID_CONFIG; +import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.KafkaServerConfigs.METADATA_LOG_DIR_CONFIG; public class CCEmbeddedBrokerBuilder { @@ -41,7 +42,6 @@ public class CCEmbeddedBrokerBuilder { //feature control private boolean _enableControlledShutdown; private boolean _enableDeleteTopic; - private boolean _enableLogCleaner; //resource management // 2MB private long _logCleanerDedupBufferSize = 2097152; @@ -210,17 +210,6 @@ public CCEmbeddedBrokerBuilder enableDeleteTopic(boolean enableDeleteTopic) { return this; } - /** - * Enable log cleaner. - * - * @param enableLogCleaner {@code true} to enable log cleaner, {@code false} otherwise. - * @return This. - */ - public CCEmbeddedBrokerBuilder enableLogCleaner(boolean enableLogCleaner) { - _enableLogCleaner = enableLogCleaner; - return this; - } - /** * Set log cleaner dedup buffer size. * @param logCleanerDedupBufferSize log cleaner dedup buffer size. @@ -285,13 +274,12 @@ public Map buildConfig() { props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL"); props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString()); props.put(ServerLogConfigs.LOG_DIR_CONFIG, _logDirectory.getAbsolutePath()); - props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath()); + props.put(METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath()); props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs)); props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs)); props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, Boolean.toString(_enableControlledShutdown)); props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, Boolean.toString(_enableDeleteTopic)); props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, Long.toString(_logCleanerDedupBufferSize)); - props.put(CleanerConfig.LOG_CLEANER_ENABLE_PROP, Boolean.toString(_enableLogCleaner)); props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); if (_rack != null) { diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaRaftServer.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaRaftServer.java index b4e90bbb9..6b25919e5 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaRaftServer.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaRaftServer.java @@ -4,7 +4,6 @@ package com.linkedin.kafka.cruisecontrol.metricsreporter.utils; -import kafka.log.UnifiedLog; import kafka.metrics.KafkaMetricsReporter; import kafka.server.BrokerServer; import kafka.server.ControllerServer; @@ -17,6 +16,7 @@ import kafka.utils.Mx4jLoader; import kafka.utils.VerifiableProperties; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigResource.Type; import org.apache.kafka.common.internals.Topic; @@ -44,7 +44,6 @@ import org.apache.kafka.storage.internals.log.LogConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.jdk.javaapi.CollectionConverters; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -61,6 +60,8 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.KafkaServerConfigs.METADATA_LOG_DIR_CONFIG; + /** * This class implements the KRaft (Kafka Raft) mode server which relies on a KRaft quorum for maintaining cluster * metadata. It is responsible for constructing the controller and/or broker based on the `process.roles` configuration @@ -165,7 +166,7 @@ public int boundControllerPort() throws ExecutionException, InterruptedException private static MetaTuple initializeLogDirs(KafkaConfig config, String logPrefix) { Loader loader = new Loader(); loader.addMetadataLogDir(config.metadataLogDir()); - loader.addLogDirs(CollectionConverters.asJava(config.logDirs())); + loader.addLogDirs(config.logDirs()); // Load the MetaPropertiesEnsemble MetaPropertiesEnsemble initialMetaPropsEnsemble; @@ -181,7 +182,8 @@ private static MetaTuple initializeLogDirs(KafkaConfig config, String logPrefix) ); initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir -> { if (!logDir.equals(config.metadataLogDir())) { - File clusterMetadataTopic = new File(logDir, UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION)); + TopicPartition tp = Topic.CLUSTER_METADATA_TOPIC_PARTITION; + File clusterMetadataTopic = new File(logDir, tp.topic() + "-" + tp.partition()); if (clusterMetadataTopic.exists()) { throw new KafkaException("Unexpected metadata directory (" + config.metadataLogDir() + ") found for " + clusterMetadataTopic); } @@ -222,7 +224,7 @@ private static MetaTuple initializeLogDirs(KafkaConfig config, String logPrefix) private void initializeMetaData(KafkaConfig config) { Set allLogDirs = new HashSet<>(readLogDirs(config)); - allLogDirs.add(new File(config.getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG))); + allLogDirs.add(new File(config.getString(METADATA_LOG_DIR_CONFIG))); for (File logDir : allLogDirs) { File metaPropsFile = new File(logDir, "meta.properties"); if (!metaPropsFile.exists()) { @@ -240,7 +242,7 @@ private void initializeMetaData(KafkaConfig config) { properties.setProperty(CLUSTER_ID_CONFIG, _clusterId); properties.setProperty(KRaftConfigs.NODE_ID_CONFIG, config.get(KRaftConfigs.NODE_ID_CONFIG).toString()); properties.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, config.getString(ServerLogConfigs.LOG_DIR_CONFIG)); - properties.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, config.getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG)); + properties.setProperty(METADATA_LOG_DIR_CONFIG, config.getString(METADATA_LOG_DIR_CONFIG)); try (FileOutputStream out = new FileOutputStream(metaPropsFile)) { properties.store(out, "Meta Properties"); diff --git a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/KafkaServerConfigs.java b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/KafkaServerConfigs.java index d8889de0e..d50187062 100644 --- a/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/KafkaServerConfigs.java +++ b/cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/KafkaServerConfigs.java @@ -12,6 +12,7 @@ public final class KafkaServerConfigs { public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor"; public static final String DEFAULT_REPLICATION_FACTOR_CONFIG = "default.replication.factor"; public static final String NUM_PARTITIONS_CONFIG = "num.partitions"; + public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir"; private KafkaServerConfigs() { } } diff --git a/gradle.properties b/gradle.properties index c86748212..7902f58ab 100644 --- a/gradle.properties +++ b/gradle.properties @@ -2,7 +2,7 @@ org.gradle.daemon=false org.gradle.parallel=false org.gradle.jvmargs=-Xms512m -Xmx512m scalaVersion=2.13.13 -kafkaVersion=4.0.0 +kafkaVersion=4.1.1 nettyVersion=4.1.118.Final jettyVersion=9.4.56.v20240826 vertxVersion=4.5.8