Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ Cruise Control for Apache Kafka
`2.6` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.11+`), `2.7` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.36+`),
`2.8` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.66+`), `3.0` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.85+`),
`3.1` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.85+`), `3.8` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.142+`),
`3.9` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.143+`), and `4.0` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.144+`)
`3.9` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.143+`), `4.0` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.144+`),
and `4.1.1` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.5.146+`)
* The `migrate_to_kafka_2_4` branch of Cruise Control is compatible with Apache Kafka `2.4` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.4.*`).
* The `kafka_2_0_to_2_3` branch (deprecated) of Cruise Control is compatible with Apache Kafka `2.0`, `2.1`, `2.2`, and `2.3` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `2.0.*`).
* The `kafka_0_11_and_1_0` branch (deprecated) of Cruise Control is compatible with Apache Kafka `0.11.0.0`, `1.0`, and `1.1` (i.e. [Releases](https://github.com/linkedin/cruise-control/releases) with `0.1.*`).
Expand Down
6 changes: 0 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,6 @@ project(':cruise-control') {
api 'org.json:json:20231013'
api 'org.xerial.snappy:snappy-java:1.1.10.5'

constraints {
implementation("commons-beanutils:commons-beanutils:1.11.0") {
because("version 1.9.4 pulled from kafa 3.9.1 has CVE-2025-48734 in it, which is fixed in 1.11.0")
}
}

testImplementation project(path: ':cruise-control-metrics-reporter', configuration: 'testOutput')
testImplementation project(path: ':cruise-control-core', configuration: 'testOutput')
testImplementation "org.scala-lang:scala-library:$scalaVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.CommonClientConfigs;
Expand Down Expand Up @@ -50,7 +49,7 @@
import static org.junit.Assert.assertTrue;

public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness {
private static final int NUM_OF_BROKERS = 2;
private static final int NUM_OF_BROKERS = 3;
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
protected static final String HOST = "127.0.0.1";
protected CCContainerizedKraftCluster _cluster;
Expand Down Expand Up @@ -115,6 +114,11 @@ public Properties overridingProps() {
return props;
}

@Override
protected int clusterSize() {
return NUM_OF_BROKERS;
}

@Test
public void testReportingMetrics() {
Properties props = new Properties();
Expand Down Expand Up @@ -201,7 +205,7 @@ public void testReportingMetrics() {
}

@Test
public void testUpdatingMetricsTopicConfig() throws InterruptedException, TimeoutException {
public void testUpdatingMetricsTopicConfig() {
TopicDescription topicDescription = _cluster.waitForTopicMetadata(TOPIC, Duration.ofSeconds(30), td -> true);
assertEquals(1, topicDescription.partitions().size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
* </ul>
*/
public class CCContainerizedKraftCluster implements Startable {
private static final String KAFKA_IMAGE = System.getenv().getOrDefault("KAFKA_IMAGE", "apache/kafka:3.9.1");
private static final String KAFKA_IMAGE = System.getenv().getOrDefault("KAFKA_IMAGE", "apache/kafka:4.1.1");
/**
* Determines the hostname used by containers to connect to services running on the host machine.
* Required for CI environments like CircleCI, where the Docker executor relies on a specific hostname
Expand Down Expand Up @@ -156,6 +156,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo) {
.withNetwork(NETWORK)
.withNetworkAliases(networkAlias)
.withExposedPorts(CONTAINER_EXTERNAL_LISTENER_PORT)
.withCreateContainerCmdModifier(cmd -> cmd.withUser("root"))
.withEnv("CLUSTER_ID", clusterId)
// Uncomment the following line when debugging Kafka cluster problems.
//.withLogConsumer(outputFrame -> System.out.print(networkAlias + " | " + outputFrame.getUtf8String()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaRaftServer.CLUSTER_ID_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.KafkaServerConfigs.METADATA_LOG_DIR_CONFIG;

public class CCEmbeddedBroker implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(CCEmbeddedBroker.class);
Expand Down Expand Up @@ -60,7 +61,7 @@ public CCEmbeddedBroker(Map<Object, Object> config) {
private void parseConfigs(Map<Object, Object> config) {
readLogDirs(config);
_id = Integer.parseInt((String) config.get(KRaftConfigs.NODE_ID_CONFIG));
_metadataLogDir = new File((String) config.get(KRaftConfigs.METADATA_LOG_DIR_CONFIG));
_metadataLogDir = new File((String) config.get(METADATA_LOG_DIR_CONFIG));

// Bind addresses
String listenersString = (String) config.get(SocketServerConfigs.LISTENERS_CONFIG);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.storage.internals.log.CleanerConfig;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaRaftServer.CLUSTER_ID_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.KafkaServerConfigs.METADATA_LOG_DIR_CONFIG;


public class CCEmbeddedBrokerBuilder {
Expand All @@ -41,7 +42,6 @@ public class CCEmbeddedBrokerBuilder {
//feature control
private boolean _enableControlledShutdown;
private boolean _enableDeleteTopic;
private boolean _enableLogCleaner;
//resource management
// 2MB
private long _logCleanerDedupBufferSize = 2097152;
Expand Down Expand Up @@ -210,17 +210,6 @@ public CCEmbeddedBrokerBuilder enableDeleteTopic(boolean enableDeleteTopic) {
return this;
}

/**
* Enable log cleaner.
*
* @param enableLogCleaner {@code true} to enable log cleaner, {@code false} otherwise.
* @return This.
*/
public CCEmbeddedBrokerBuilder enableLogCleaner(boolean enableLogCleaner) {
_enableLogCleaner = enableLogCleaner;
return this;
}

/**
* Set log cleaner dedup buffer size.
* @param logCleanerDedupBufferSize log cleaner dedup buffer size.
Expand Down Expand Up @@ -285,13 +274,12 @@ public Map<Object, Object> buildConfig() {
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL");
props.put(SocketServerConfigs.LISTENERS_CONFIG, csvJoiner.toString());
props.put(ServerLogConfigs.LOG_DIR_CONFIG, _logDirectory.getAbsolutePath());
props.put(KRaftConfigs.METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath());
props.put(METADATA_LOG_DIR_CONFIG, _metadataLogDirectory.getAbsolutePath());
props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, Long.toString(_socketTimeoutMs));
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, Boolean.toString(_enableControlledShutdown));
props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, Boolean.toString(_enableDeleteTopic));
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, Long.toString(_logCleanerDedupBufferSize));
props.put(CleanerConfig.LOG_CLEANER_ENABLE_PROP, Boolean.toString(_enableLogCleaner));
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
if (_rack != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;

import kafka.log.UnifiedLog;
import kafka.metrics.KafkaMetricsReporter;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
Expand All @@ -17,6 +16,7 @@
import kafka.utils.Mx4jLoader;
import kafka.utils.VerifiableProperties;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.internals.Topic;
Expand Down Expand Up @@ -44,7 +44,6 @@
import org.apache.kafka.storage.internals.log.LogConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.jdk.javaapi.CollectionConverters;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
Expand All @@ -61,6 +60,8 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.KafkaServerConfigs.METADATA_LOG_DIR_CONFIG;

/**
* This class implements the KRaft (Kafka Raft) mode server which relies on a KRaft quorum for maintaining cluster
* metadata. It is responsible for constructing the controller and/or broker based on the `process.roles` configuration
Expand Down Expand Up @@ -165,7 +166,7 @@ public int boundControllerPort() throws ExecutionException, InterruptedException
private static MetaTuple initializeLogDirs(KafkaConfig config, String logPrefix) {
Loader loader = new Loader();
loader.addMetadataLogDir(config.metadataLogDir());
loader.addLogDirs(CollectionConverters.asJava(config.logDirs()));
loader.addLogDirs(config.logDirs());

// Load the MetaPropertiesEnsemble
MetaPropertiesEnsemble initialMetaPropsEnsemble;
Expand All @@ -181,7 +182,8 @@ private static MetaTuple initializeLogDirs(KafkaConfig config, String logPrefix)
);
initialMetaPropsEnsemble.logDirProps().keySet().forEach(logDir -> {
if (!logDir.equals(config.metadataLogDir())) {
File clusterMetadataTopic = new File(logDir, UnifiedLog.logDirName(Topic.CLUSTER_METADATA_TOPIC_PARTITION));
TopicPartition tp = Topic.CLUSTER_METADATA_TOPIC_PARTITION;
File clusterMetadataTopic = new File(logDir, tp.topic() + "-" + tp.partition());
if (clusterMetadataTopic.exists()) {
throw new KafkaException("Unexpected metadata directory (" + config.metadataLogDir() + ") found for " + clusterMetadataTopic);
}
Expand Down Expand Up @@ -222,7 +224,7 @@ private static MetaTuple initializeLogDirs(KafkaConfig config, String logPrefix)

private void initializeMetaData(KafkaConfig config) {
Set<File> allLogDirs = new HashSet<>(readLogDirs(config));
allLogDirs.add(new File(config.getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG)));
allLogDirs.add(new File(config.getString(METADATA_LOG_DIR_CONFIG)));
for (File logDir : allLogDirs) {
File metaPropsFile = new File(logDir, "meta.properties");
if (!metaPropsFile.exists()) {
Expand All @@ -240,7 +242,7 @@ private void initializeMetaData(KafkaConfig config) {
properties.setProperty(CLUSTER_ID_CONFIG, _clusterId);
properties.setProperty(KRaftConfigs.NODE_ID_CONFIG, config.get(KRaftConfigs.NODE_ID_CONFIG).toString());
properties.setProperty(ServerLogConfigs.LOG_DIR_CONFIG, config.getString(ServerLogConfigs.LOG_DIR_CONFIG));
properties.setProperty(KRaftConfigs.METADATA_LOG_DIR_CONFIG, config.getString(KRaftConfigs.METADATA_LOG_DIR_CONFIG));
properties.setProperty(METADATA_LOG_DIR_CONFIG, config.getString(METADATA_LOG_DIR_CONFIG));

try (FileOutputStream out = new FileOutputStream(metaPropsFile)) {
properties.store(out, "Meta Properties");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public final class KafkaServerConfigs {
public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor";
public static final String DEFAULT_REPLICATION_FACTOR_CONFIG = "default.replication.factor";
public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
public static final String METADATA_LOG_DIR_CONFIG = "metadata.log.dir";

private KafkaServerConfigs() { }
}
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ org.gradle.daemon=false
org.gradle.parallel=false
org.gradle.jvmargs=-Xms512m -Xmx512m
scalaVersion=2.13.13
kafkaVersion=4.0.0
kafkaVersion=4.1.1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, thanks!

nettyVersion=4.1.118.Final
jettyVersion=9.4.56.v20240826
vertxVersion=4.5.8
Expand Down
Loading