Skip to content

Commit 74761d2

Browse files
committed
Refactor CCKafkaClientsIntegrationTestHarness and dependent classes.
Signed-off-by: Kyle Liberti <kliberti.us@gmail.com>
1 parent e30eaf3 commit 74761d2

4 files changed

Lines changed: 57 additions & 138 deletions

File tree

cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporter.java

Lines changed: 9 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.concurrent.TimeoutException;
2929
import org.apache.kafka.clients.ClientUtils;
3030
import org.apache.kafka.clients.CommonClientConfigs;
31+
import org.apache.kafka.clients.admin.Admin;
3132
import org.apache.kafka.clients.admin.AdminClient;
3233
import org.apache.kafka.clients.admin.AlterConfigOp;
3334
import org.apache.kafka.clients.admin.AlterConfigsResult;
@@ -375,7 +376,6 @@ protected void maybeIncreaseTopicPartitionCount() {
375376
String cruiseControlMetricsTopic = _metricsTopic.name();
376377

377378
try {
378-
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
379379
TopicDescription topicDescription = getTopicDescription(_adminClient, cruiseControlMetricsTopic);
380380

381381
if (topicDescription.partitions().size() < _metricsTopic.numPartitions()) {
@@ -514,68 +514,22 @@ private void setIfAbsent(Properties props, String key, String value) {
514514
}
515515

516516
/**
517-
* Attempts to retrieve the method for mapping topic names to futures from the {@link org.apache.kafka.clients.admin.DescribeTopicsResult} class.
518-
* This method first tries to get the {@code topicNameValues()} method, which is available in Kafka 3.1.0 and later.
519-
* If the method is not found, it falls back to trying to retrieve the {@code values()} method, which is available in Kafka 3.9.0 and earlier.
520-
*
521-
* If neither of these methods is found, a {@link RuntimeException} is thrown.
522-
*
523-
* <p>This method is useful for ensuring compatibility with both older and newer versions of Kafka clients.</p>
524-
*
525-
* @return the {@link Method} object representing the {@code topicNameValues()} or {@code values()} method.
526-
* @throws RuntimeException if neither the {@code values()} nor {@code topicNameValues()} methods are found.
527-
*/
528-
/* test */ static Method topicNameValuesMethod() {
529-
//
530-
Method topicDescriptionMethod = null;
531-
try {
532-
// First we try to get the topicNameValues() method
533-
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("topicNameValues");
534-
} catch (NoSuchMethodException exception) {
535-
LOG.info("Failed to get method topicNameValues() from DescribeTopicsResult class since we are probably on kafka 3.0.0 or older: ", exception);
536-
}
537-
538-
if (topicDescriptionMethod == null) {
539-
try {
540-
// Second we try to get the values() method
541-
topicDescriptionMethod = DescribeTopicsResult.class.getMethod("values");
542-
} catch (NoSuchMethodException exception) {
543-
LOG.info("Failed to get method values() from DescribeTopicsResult class: ", exception);
544-
}
545-
}
546-
547-
if (topicDescriptionMethod != null) {
548-
return topicDescriptionMethod;
549-
} else {
550-
throw new RuntimeException("Unable to find both values() and topicNameValues() method in the DescribeTopicsResult class ");
551-
}
552-
}
553-
554-
/**
555-
* Retrieves the {@link TopicDescription} for the specified Kafka topic, handling compatibility
556-
* with Kafka versions 4.0 and above. This method uses reflection to invoke the appropriate method
557-
* for retrieving topic description information, depending on the Kafka version.
517+
* Retrieves the {@link TopicDescription} for the specified Kafka topic.
558518
*
559519
* @param adminClient The Kafka {@link AdminClient} used to interact with the Kafka cluster.
560520
* @param ccMetricsTopic The name of the Kafka topic for which the description is to be retrieved.
561521
*
562522
* @return The {@link TopicDescription} for the specified Kafka topic.
563523
*
564-
* @throws KafkaTopicDescriptionException If an error occurs while retrieving the topic description,
565-
* or if the topic name retrieval method cannot be found or invoked properly. This includes
566-
* exceptions related to reflection (e.g., {@link NoSuchMethodException}), invocation issues,
567-
* execution exceptions, timeouts, and interruptions.
524+
* @throws KafkaTopicDescriptionException If an error occurs while retrieving the topic description.
568525
*/
569-
/* test */ static TopicDescription getTopicDescription(AdminClient adminClient, String ccMetricsTopic) throws KafkaTopicDescriptionException {
526+
/* test */ static TopicDescription getTopicDescription(Admin adminClient, String ccMetricsTopic) throws KafkaTopicDescriptionException {
570527
try {
571-
// For compatibility with Kafka 4.0 and beyond we must use new API methods.
572-
Method topicDescriptionMethod = topicNameValuesMethod();
573-
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(ccMetricsTopic));
574-
Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = (Map<String, KafkaFuture<TopicDescription>>) topicDescriptionMethod
575-
.invoke(describeTopicsResult);
576-
return topicDescriptionMap.get(ccMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
577-
} catch (InvocationTargetException | IllegalAccessException | ExecutionException | InterruptedException | TimeoutException e) {
578-
throw new KafkaTopicDescriptionException(String.format("Unable to retrieve config of Cruise Cruise Control metrics topic {}.",
528+
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(ccMetricsTopic));
529+
Map<String, KafkaFuture<TopicDescription>> topicDescriptionMap = describeTopicsResult.topicNameValues();
530+
return topicDescriptionMap.get(ccMetricsTopic).get(CLIENT_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS);
531+
} catch (ExecutionException | InterruptedException | TimeoutException e) {
532+
throw new KafkaTopicDescriptionException(String.format("Unable to retrieve config of Cruise Cruise Control metrics topic %s.",
579533
ccMetricsTopic), e);
580534
}
581535
}

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterAutoCreateTopicTest.java

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55
package com.linkedin.kafka.cruisecontrol.metricsreporter;
66

77
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
8-
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCContainerizedKraftCluster;
98
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
109
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.KafkaServerConfigs;
11-
import org.apache.kafka.clients.CommonClientConfigs;
1210
import org.apache.kafka.clients.admin.AdminClient;
1311
import org.apache.kafka.clients.admin.CreateTopicsResult;
1412
import org.apache.kafka.clients.admin.NewTopic;
@@ -17,7 +15,6 @@
1715
import org.apache.kafka.clients.producer.Producer;
1816
import org.apache.kafka.clients.producer.ProducerConfig;
1917
import org.apache.kafka.clients.producer.ProducerRecord;
20-
import org.junit.After;
2118
import org.junit.Before;
2219
import org.junit.Test;
2320
import java.util.Collections;
@@ -28,22 +25,14 @@
2825
import static org.junit.Assert.assertEquals;
2926

3027
public class CruiseControlMetricsReporterAutoCreateTopicTest extends CCKafkaClientsIntegrationTestHarness {
31-
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
3228
protected static final String TEST_TOPIC = "TestTopic";
33-
private CCContainerizedKraftCluster _cluster;
3429

3530
/**
3631
* Setup the unit test.
3732
*/
3833
@Before
3934
public void setUp() {
40-
Properties adminClientProps = new Properties();
41-
setSecurityConfigs(adminClientProps, "admin");
42-
43-
_cluster = new CCContainerizedKraftCluster(2, buildBrokerConfigs(), adminClientProps);
44-
_cluster.start();
45-
_bootstrapUrl = _cluster.getExternalBootstrapAddress();
46-
35+
super.setUp();
4736
// creating the "TestTopic" explicitly because the topic auto-creation is disabled on the broker
4837
Properties adminProps = new Properties();
4938
adminProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
@@ -76,28 +65,9 @@ public void setUp() {
7665
assertEquals(0, producerFailed.get());
7766
}
7867

79-
/**
80-
* Tear down the unit test.
81-
*/
82-
@After
83-
public void tearDown() {
84-
if (_cluster != null) {
85-
_cluster.close();
86-
}
87-
}
88-
8968
@Override
9069
public Properties overridingProps() {
91-
Properties props = new Properties();
92-
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
93-
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
94-
"127.0.0.1:" + CCContainerizedKraftCluster.CONTAINER_INTERNAL_LISTENER_PORT);
95-
props.put("listener.security.protocol.map", String.join(",",
96-
CCContainerizedKraftCluster.CONTROLLER_LISTENER_NAME + ":PLAINTEXT",
97-
CCContainerizedKraftCluster.INTERNAL_LISTENER_NAME + ":PLAINTEXT",
98-
CCContainerizedKraftCluster.EXTERNAL_LISTENER_NAME + ":PLAINTEXT"));
99-
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
100-
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
70+
Properties props = super.overridingProps();
10171
// configure metrics topic auto-creation by the metrics reporter
10272
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
10373
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_TIMEOUT_MS_CONFIG, "5000");
@@ -106,8 +76,6 @@ public Properties overridingProps() {
10676
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
10777
// disable topic auto-creation to leave the metrics reporter to create the metrics topic
10878
props.setProperty(KafkaServerConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false");
109-
props.setProperty(KafkaServerConfigs.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
110-
props.setProperty(KafkaServerConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
11179
props.setProperty(KafkaServerConfigs.NUM_PARTITIONS_CONFIG, "2");
11280
return props;
11381
}

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterTest.java

Lines changed: 1 addition & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,18 @@
66

77
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
88
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
9-
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCContainerizedKraftCluster;
109
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
1110
import java.time.Duration;
1211
import java.util.Arrays;
1312
import java.util.Collections;
1413
import java.util.HashMap;
1514
import java.util.HashSet;
16-
import java.util.List;
1715
import java.util.Map;
1816
import java.util.Properties;
1917
import java.util.Set;
2018
import java.util.concurrent.TimeoutException;
2119
import java.util.concurrent.atomic.AtomicInteger;
2220
import java.util.regex.Pattern;
23-
import org.apache.kafka.clients.CommonClientConfigs;
2421
import org.apache.kafka.clients.admin.TopicDescription;
2522
import org.apache.kafka.clients.consumer.Consumer;
2623
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -33,42 +30,26 @@
3330
import org.apache.kafka.clients.producer.ProducerRecord;
3431
import org.apache.kafka.clients.producer.RecordMetadata;
3532
import org.apache.kafka.common.serialization.StringDeserializer;
36-
import org.junit.After;
3733
import org.junit.Before;
3834
import org.junit.Test;
3935
import org.testcontainers.kafka.KafkaContainer;
4036

4137
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_HOST;
4238
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_PORT;
43-
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
4439
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG;
45-
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG;
4640
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG;
4741
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG;
4842
import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.*;
4943
import static org.junit.Assert.assertEquals;
5044
import static org.junit.Assert.assertTrue;
5145

5246
public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness {
53-
private static final int NUM_OF_BROKERS = 2;
54-
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
55-
protected static final String HOST = "127.0.0.1";
56-
protected CCContainerizedKraftCluster _cluster;
57-
protected List<Map<Object, Object>> _brokerConfigs;
58-
5947
/**
6048
* Setup the unit test.
6149
*/
6250
@Before
6351
public void setUp() {
64-
Properties adminClientProps = new Properties();
65-
setSecurityConfigs(adminClientProps, "admin");
66-
67-
_brokerConfigs = buildBrokerConfigs();
68-
_cluster = new CCContainerizedKraftCluster(NUM_OF_BROKERS, _brokerConfigs, adminClientProps);
69-
_cluster.start();
70-
_bootstrapUrl = _cluster.getExternalBootstrapAddress();
71-
52+
super.setUp();
7253
Properties props = new Properties();
7354
props.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
7455
AtomicInteger failed = new AtomicInteger(0);
@@ -87,34 +68,6 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
8768
assertEquals(0, failed.get());
8869
}
8970

90-
/**
91-
* Tear down the unit test.
92-
*/
93-
@After
94-
public void tearDown() {
95-
if (_cluster != null) {
96-
_cluster.close();
97-
}
98-
}
99-
100-
@Override
101-
public Properties overridingProps() {
102-
Properties props = new Properties();
103-
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
104-
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
105-
HOST + ":" + CCContainerizedKraftCluster.CONTAINER_INTERNAL_LISTENER_PORT);
106-
props.put("listener.security.protocol.map", String.join(",",
107-
CCContainerizedKraftCluster.CONTROLLER_LISTENER_NAME + ":PLAINTEXT",
108-
CCContainerizedKraftCluster.INTERNAL_LISTENER_NAME + ":PLAINTEXT",
109-
CCContainerizedKraftCluster.EXTERNAL_LISTENER_NAME + ":PLAINTEXT"));
110-
props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
111-
props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
112-
props.setProperty("log.flush.interval.messages", "1");
113-
props.setProperty("offsets.topic.replication.factor", "1");
114-
props.setProperty("default.replication.factor", "2");
115-
return props;
116-
}
117-
11871
@Test
11972
public void testReportingMetrics() {
12073
Properties props = new Properties();

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/utils/CCKafkaClientsIntegrationTestHarness.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,65 @@
55
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;
66

77
import java.io.File;
8+
import java.util.List;
9+
import java.util.Map;
810
import java.util.Properties;
11+
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter;
12+
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
913
import org.apache.kafka.clients.CommonClientConfigs;
1014
import org.apache.kafka.clients.producer.ProducerConfig;
1115
import org.apache.kafka.clients.producer.Producer;
1216
import org.apache.kafka.clients.producer.KafkaProducer;
1317
import org.apache.kafka.common.config.SslConfigs;
1418
import org.apache.kafka.common.security.auth.SecurityProtocol;
1519
import org.apache.kafka.common.serialization.StringSerializer;
20+
import org.junit.After;
21+
22+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
23+
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG;
1624

1725

1826
public abstract class CCKafkaClientsIntegrationTestHarness extends CCKafkaIntegrationTestHarness {
27+
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
28+
protected CCContainerizedKraftCluster _cluster;
29+
protected List<Map<Object, Object>> _brokerConfigs;
1930

2031
@Override
2132
public void setUp() {
22-
super.setUp();
33+
Properties adminClientProps = new Properties();
34+
setSecurityConfigs(adminClientProps, "admin");
35+
36+
_brokerConfigs = buildBrokerConfigs();
37+
_cluster = new CCContainerizedKraftCluster(2, _brokerConfigs, adminClientProps);
38+
_cluster.start();
39+
_bootstrapUrl = _cluster.getExternalBootstrapAddress();
40+
}
41+
42+
/**
43+
* Tear down the unit test.
44+
*/
45+
@After
46+
public void tearDown() {
47+
if (_cluster != null) {
48+
_cluster.close();
49+
}
50+
}
51+
52+
@Override
53+
public Properties overridingProps() {
54+
Properties props = new Properties();
55+
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
56+
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
57+
"localhost:" + CCContainerizedKraftCluster.CONTAINER_INTERNAL_LISTENER_PORT);
58+
props.put("listener.security.protocol.map", String.join(",",
59+
CCContainerizedKraftCluster.CONTROLLER_LISTENER_NAME + ":PLAINTEXT",
60+
CCContainerizedKraftCluster.INTERNAL_LISTENER_NAME + ":PLAINTEXT",
61+
CCContainerizedKraftCluster.EXTERNAL_LISTENER_NAME + ":PLAINTEXT"));
62+
props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
63+
props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
64+
props.setProperty(KafkaServerConfigs.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
65+
props.setProperty(KafkaServerConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
66+
return props;
2367
}
2468

2569
@javax.annotation.Nonnull

0 commit comments

Comments
 (0)