Skip to content

Commit 27c0bf4

Browse files
Remove ZooKeeper (linkedin#2244)
* Remove ZooKeeper from Cruise Control production Authored-by: Tamas Barnabas Egyed <egyed.t@cloudera.com> Change-Id: Ic9a7e255193cfbbc6c537e5138c86725363a74b9 * Use KRaft in Cruise Control tests Co-authored-by: Patrik Marton <pmarton@cloudera.com> Co-authored-by: Tamas Barnabas Egyed <egyed.t@cloudera.com> --------- Co-authored-by: Patrik Marton <pmarton@cloudera.com>
1 parent 86e3127 commit 27c0bf4

47 files changed

Lines changed: 695 additions & 1100 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ Control). The metrics reporter periodically samples the Kafka raw metrics on the
9191
* If the default broker cleanup policy is `compact`, make sure that the topic to which Cruise Control metrics
9292
reporter should send messages is created with the `delete` cleanup policy -- the default metrics reporter topic
9393
is `__CruiseControlMetrics`.
94-
2. Start ZooKeeper and Kafka server ([See tutorial](https://kafka.apache.org/quickstart)).
94+
2. Start Kafka server ([See tutorial](https://kafka.apache.org/quickstart)).
9595
3. Modify `config/cruisecontrol.properties` of Cruise Control:
96-
* (Required) fill in `bootstrap.servers` and `zookeeper.connect` to the Kafka cluster to be monitored.
96+
* (Required) fill in `bootstrap.servers` to the Kafka cluster to be monitored.
9797
* (Required) update `capacity.config.file` to the path of your capacity file.
9898
* Capacity file is a JSON file that provides the capacity of the brokers
9999
* You can start Cruise Control server with the default file (`config/capacityJBOD.json`), but it may not reflect the actual capacity of the brokers

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,6 @@ project(':cruise-control') {
289289
api project(':cruise-control-core')
290290
implementation "org.slf4j:slf4j-api:1.7.36"
291291
implementation "org.apache.logging.log4j:log4j-slf4j-impl:2.17.2"
292-
implementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
293292
implementation "io.netty:netty-handler:${nettyVersion}"
294293
implementation "io.netty:netty-transport-native-epoll:${nettyVersion}"
295294
api "org.apache.kafka:kafka_$scalaBinaryVersion:$kafkaVersion"
@@ -481,9 +480,10 @@ project(':cruise-control-metrics-reporter') {
481480
testImplementation "org.apache.kafka:kafka-clients:$kafkaVersion:test"
482481
testImplementation "org.apache.kafka:kafka-server-common:$kafkaVersion"
483482
testImplementation "org.apache.kafka:kafka-group-coordinator:$kafkaVersion"
483+
testImplementation "org.apache.kafka:kafka-metadata:$kafkaVersion"
484+
testImplementation "org.apache.kafka:kafka-raft:$kafkaVersion"
484485
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
485486
testImplementation 'commons-io:commons-io:2.11.0'
486-
testImplementation "org.apache.zookeeper:zookeeper:${zookeeperVersion}"
487487
testOutput sourceSets.test.output
488488
}
489489

config/cruise_control_jaas.conf_template

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,4 @@
1-
//Rename this file to cruise_control_jaas.conf when using secured zookeepers
2-
//For detailed instructions, see /docs/wiki/User Guide/Secure-zookeeper-configuration.md
3-
4-
//Enter appropriate Client entry for secured zookeeper client connections
5-
Client {
6-
com.sun.security.auth.module.Krb5LoginModule required
7-
useKeyTab=true
8-
keyTab="/path/to/zookeeper_client.keytab"
9-
storeKey=true
10-
useTicketCache=false
11-
principal="zookeeper_client@<REALM>";
12-
};
13-
1+
//Rename this file to cruise_control_jaas.conf if security is enabled
142
//Enter appropriate KafkaClient entry if using the SASL protocol, remove if not
153
KafkaClient {
164
com.sun.security.auth.module.Krb5LoginModule required

config/cruisecontrol.properties

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -173,12 +173,6 @@ num.proposal.precompute.threads=1
173173
# Configurations for the executor
174174
# =======================================
175175

176-
# The zookeeper connect of the Kafka cluster
177-
zookeeper.connect=localhost:2181/
178-
179-
# If true, appropriate zookeeper Client { .. } entry required in jaas file located at $base_dir/config/cruise_control_jaas.conf
180-
zookeeper.security.enabled=false
181-
182176
# The max number of partitions to move in/out on a given broker at a given time.
183177
num.concurrent.partition.movements.per.broker=10
184178

@@ -222,9 +216,6 @@ self.healing.exclude.recently.demoted.brokers=true
222216
# True if recently removed brokers are excluded from optimizations during self healing, false otherwise
223217
self.healing.exclude.recently.removed.brokers=true
224218

225-
# The zk path to store failed broker information.
226-
failed.brokers.zk.path=/CruiseControlBrokerList
227-
228219
# Topic config provider class
229220
topic.config.provider.class=com.linkedin.kafka.cruisecontrol.config.KafkaAdminTopicConfigProvider
230221

cruise-control-core/src/test/resources/log4j2.properties

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,5 @@ logger.orgApacheKafka.level=error
1717
logger.kafka.name=kafka
1818
logger.kafka.level=error
1919

20-
# zkclient can be verbose, during debugging it is common to adjust is separately
21-
logger.zkclient.name=kafka.zk.KafkaZkClient
22-
logger.zkclient.level=warn
23-
24-
logger.zookeeper.name=org.apache.zookeeper
25-
logger.zookeeper.level=warn
26-
2720
rootLogger.appenderRefs=console
2821
rootLogger.appenderRef.console.ref=STDOUT

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Map;
1818
import java.util.Properties;
1919
import java.util.Set;
20+
import java.util.concurrent.ExecutionException;
2021
import java.util.concurrent.atomic.AtomicInteger;
2122
import java.util.regex.Pattern;
2223
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
@@ -51,6 +52,7 @@
5152
import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.*;
5253
import static org.junit.Assert.assertEquals;
5354
import static org.junit.Assert.assertTrue;
55+
import static org.junit.Assert.fail;
5456

5557
public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness {
5658
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
@@ -186,7 +188,7 @@ public void testReportingMetrics() {
186188
}
187189

188190
@Test
189-
public void testUpdatingMetricsTopicConfig() throws InterruptedException {
191+
public void testUpdatingMetricsTopicConfig() throws ExecutionException, InterruptedException {
190192
Properties props = new Properties();
191193
setSecurityConfigs(props, "admin");
192194
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
@@ -207,19 +209,26 @@ public void testUpdatingMetricsTopicConfig() throws InterruptedException {
207209
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
208210
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "2");
209211
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
210-
CCEmbeddedBroker broker = new CCEmbeddedBroker(brokerConfig);
211-
// Restart broker
212-
broker.startup();
213-
// Wait for broker to boot up
214-
Thread.sleep(5000);
212+
try (CCEmbeddedBroker broker = new CCEmbeddedBroker(brokerConfig)) {
213+
// Restart broker
214+
broker.startup();
215+
// Check whether the topic config is updated
216+
long startTime = System.currentTimeMillis();
217+
boolean isTopicConfigChanged = false;
218+
while (!isTopicConfigChanged) {
219+
if (System.currentTimeMillis() > startTime + 60000) {
220+
fail("Topic config was not updated");
221+
}
215222

216-
// Check whether the topic config is updated
217-
try {
218-
topicDescription = getTopicDescription(adminClient, TOPIC);
219-
} catch (KafkaTopicDescriptionException e) {
220-
throw new RuntimeException(e);
223+
TopicDescription description = adminClient.describeTopics(Collections.singleton(TOPIC)).topicNameValues().get(TOPIC).get();
224+
isTopicConfigChanged = 2 == description.partitions().size();
225+
226+
try {
227+
Thread.sleep(5000);
228+
} catch (InterruptedException ignored) {
229+
}
230+
}
221231
}
222-
assertEquals(2, topicDescription.partitions().size());
223232
}
224233

225234
@Test
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2025 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License").
 See License in the project root for license information.
3+
*/
4+
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;
5+
6+
public class CCAbstractKRaftTestHarness {
7+
protected CCEmbeddedKRaftController _controller;
8+
9+
/**
10+
* Setup the unit test.
11+
*/
12+
public void setUp() {
13+
if (_controller == null) {
14+
_controller = new CCEmbeddedKRaftController();
15+
}
16+
_controller.startup();
17+
}
18+
19+
/**
20+
* Teardown the unit test.
21+
*/
22+
public void tearDown() {
23+
if (_controller != null) {
24+
CCKafkaTestUtils.quietly(() -> _controller.close());
25+
_controller = null;
26+
}
27+
}
28+
29+
protected CCEmbeddedKRaftController kraftController() {
30+
return _controller;
31+
}
32+
}

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCAbstractZookeeperTestHarness.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCEmbeddedBroker.java

Lines changed: 26 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -5,51 +5,49 @@
55
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;
66

77
import java.io.File;
8-
import java.lang.reflect.Constructor;
9-
import java.lang.reflect.InvocationTargetException;
108
import java.net.URI;
9+
import java.util.ArrayList;
1110
import java.util.HashMap;
11+
import java.util.List;
1212
import java.util.Map;
13-
import java.util.NoSuchElementException;
14-
import kafka.metrics.KafkaMetricsReporter;
1513
import kafka.server.KafkaConfig;
16-
import kafka.server.KafkaServer;
1714
import org.apache.commons.io.FileUtils;
1815
import org.apache.kafka.common.network.ListenerName;
1916
import org.apache.kafka.common.security.auth.SecurityProtocol;
2017
import org.apache.kafka.common.utils.Time;
2118
import org.apache.kafka.network.SocketServerConfigs;
22-
import org.apache.kafka.server.config.ServerConfigs;
19+
import org.apache.kafka.server.config.KRaftConfigs;
2320
import org.apache.kafka.server.config.ServerLogConfigs;
2421
import org.slf4j.Logger;
2522
import org.slf4j.LoggerFactory;
26-
import scala.Option;
27-
import scala.collection.Seq;
28-
import scala.collection.mutable.ArrayBuffer;
23+
24+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaRaftServer.CLUSTER_ID_CONFIG;
2925

3026
public class CCEmbeddedBroker implements AutoCloseable {
3127
private static final Logger LOG = LoggerFactory.getLogger(CCEmbeddedBroker.class);
3228
private final Map<SecurityProtocol, Integer> _ports;
3329
private final Map<SecurityProtocol, String> _hosts;
34-
private final KafkaServer _kafkaServer;
30+
private final CCKafkaRaftServer _kafkaServer;
3531
private int _id;
36-
private File _logDir;
32+
private final List<File> _logDirs;
33+
private File _metadataLogDir;
3734

3835
public CCEmbeddedBroker(Map<Object, Object> config) {
3936
_ports = new HashMap<>();
4037
_hosts = new HashMap<>();
38+
_logDirs = new ArrayList<>();
4139

4240
try {
4341
// Also validates the config
4442
KafkaConfig kafkaConfig = new KafkaConfig(config, true);
4543
parseConfigs(config);
4644

47-
_kafkaServer = createKafkaServer(kafkaConfig);
45+
_kafkaServer = new CCKafkaRaftServer(kafkaConfig, config.get(CLUSTER_ID_CONFIG).toString(), Time.SYSTEM);
4846

4947
startup();
5048
_ports.replaceAll((securityProtocol, port) -> {
5149
try {
52-
return _kafkaServer.boundPort(ListenerName.forSecurityProtocol(securityProtocol));
50+
return _kafkaServer.boundBrokerPort(ListenerName.forSecurityProtocol(securityProtocol));
5351
} catch (Exception e) {
5452
throw new IllegalStateException(e);
5553
}
@@ -59,46 +57,10 @@ public CCEmbeddedBroker(Map<Object, Object> config) {
5957
}
6058
}
6159

62-
/**
63-
* Creates the {@link KafkaServer} instance using the appropriate constructor for the version of Kafka on the classpath.
64-
* It will attempt to use the 2.8+ version first and then fall back to the 2.5+ version. If neither work, a
65-
* {@link NoSuchElementException} will be thrown.
66-
*
67-
* @param kafkaConfig The {@link KafkaConfig} instance to be used to create the returned {@link KafkaServer} instance.
68-
* @return A {@link KafkaServer} instance configured with the supplied {@link KafkaConfig}.
69-
* @throws ClassNotFoundException If a version of {@link KafkaServer} cannot be found on the classpath.
70-
*/
71-
private static KafkaServer createKafkaServer(KafkaConfig kafkaConfig) throws ClassNotFoundException {
72-
// The KafkaServer constructor changed in 2.8, so we need to figure out which one we are using and invoke it with the correct parameters
73-
KafkaServer kafkaServer = null;
74-
Class<?> kafkaServerClass = Class.forName(KafkaServer.class.getName());
75-
76-
try {
77-
Constructor<?> kafka28PlusCon = kafkaServerClass.getConstructor(KafkaConfig.class, Time.class, Option.class, boolean.class);
78-
kafkaServer = (KafkaServer) kafka28PlusCon.newInstance(kafkaConfig, Time.SYSTEM, Option.empty(), false);
79-
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
80-
LOG.debug("Unable to find Kafka 2.8+ constructor for KafkaSever class", e);
81-
}
82-
83-
if (kafkaServer == null) {
84-
try {
85-
Constructor<?> kafka25PlusCon = kafkaServerClass.getConstructor(KafkaConfig.class, Time.class, Option.class, Seq.class);
86-
kafkaServer = (KafkaServer) kafka25PlusCon.newInstance(kafkaConfig, Time.SYSTEM, Option.empty(), new ArrayBuffer<KafkaMetricsReporter>());
87-
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
88-
LOG.debug("Unable to find Kafka 2.5+ constructor for KafkaSever class", e);
89-
}
90-
}
91-
92-
if (kafkaServer != null) {
93-
return kafkaServer;
94-
} else {
95-
throw new NoSuchElementException("Unable to find viable constructor fo the KafkaServer class");
96-
}
97-
}
98-
9960
private void parseConfigs(Map<Object, Object> config) {
100-
_id = Integer.parseInt((String) config.get(ServerConfigs.BROKER_ID_CONFIG));
101-
_logDir = new File((String) config.get(ServerLogConfigs.LOG_DIR_CONFIG));
61+
readLogDirs(config);
62+
_id = Integer.parseInt((String) config.get(KRaftConfigs.NODE_ID_CONFIG));
63+
_metadataLogDir = new File((String) config.get(KRaftConfigs.METADATA_LOG_DIR_CONFIG));
10264

10365
// Bind addresses
10466
String listenersString = (String) config.get(SocketServerConfigs.LISTENERS_CONFIG);
@@ -115,6 +77,14 @@ private void parseConfigs(Map<Object, Object> config) {
11577
}
11678
}
11779

80+
private void readLogDirs(Map<Object, Object> config) {
81+
String logdir = (String) config.get(ServerLogConfigs.LOG_DIR_CONFIG);
82+
String[] paths = logdir.split(",");
83+
for (String path : paths) {
84+
_logDirs.add(new File(path));
85+
}
86+
}
87+
11888
public int id() {
11989
return _id;
12090
}
@@ -154,10 +124,9 @@ public void awaitShutdown() {
154124
public void close() {
155125
CCKafkaTestUtils.quietly(this::shutdown);
156126
CCKafkaTestUtils.quietly(this::awaitShutdown);
157-
CCKafkaTestUtils.quietly(() -> FileUtils.forceDelete(_logDir));
158-
}
159-
160-
public static CCEmbeddedBrokerBuilder newServer() {
161-
return new CCEmbeddedBrokerBuilder();
127+
CCKafkaTestUtils.quietly(() -> FileUtils.forceDelete(_metadataLogDir));
128+
for (File logDir : _logDirs) {
129+
CCKafkaTestUtils.quietly(() -> FileUtils.forceDelete(logDir));
130+
}
162131
}
163132
}

0 commit comments

Comments
 (0)