Skip to content

Commit 71b963b

Browse files
authored
Migrate AutoCreateTopicTest to TestContainers (linkedin#2313)
* Migrate AutoCreateTopicTest to TestContainers Signed-off-by: Kyle Liberti <kliberti.us@gmail.com> * Adressing feedback + fix CI tests Signed-off-by: Kyle Liberti <kliberti.us@gmail.com> * Remove 'log.flush.interval.messages' parameter Signed-off-by: Kyle Liberti <kliberti.us@gmail.com> * Addressing feedback - fv, tc Signed-off-by: Kyle Liberti <kliberti.us@gmail.com> --------- Signed-off-by: Kyle Liberti <kliberti.us@gmail.com>
1 parent 26b7c3a commit 71b963b

2 files changed

Lines changed: 42 additions & 15 deletions

File tree

cruise-control-metrics-reporter/src/test/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsReporterAutoCreateTopicTest.java

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55
package com.linkedin.kafka.cruisecontrol.metricsreporter;
66

77
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
8+
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCContainerizedKraftCluster;
89
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
9-
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
10+
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.KafkaServerConfigs;
1011
import org.apache.kafka.clients.CommonClientConfigs;
1112
import org.apache.kafka.clients.admin.AdminClient;
1213
import org.apache.kafka.clients.admin.CreateTopicsResult;
@@ -16,10 +17,6 @@
1617
import org.apache.kafka.clients.producer.Producer;
1718
import org.apache.kafka.clients.producer.ProducerConfig;
1819
import org.apache.kafka.clients.producer.ProducerRecord;
19-
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
20-
import org.apache.kafka.network.SocketServerConfigs;
21-
import org.apache.kafka.server.config.ReplicationConfigs;
22-
import org.apache.kafka.server.config.ServerLogConfigs;
2320
import org.junit.After;
2421
import org.junit.Before;
2522
import org.junit.Test;
@@ -33,13 +30,19 @@
3330
public class CruiseControlMetricsReporterAutoCreateTopicTest extends CCKafkaClientsIntegrationTestHarness {
3431
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
3532
protected static final String TEST_TOPIC = "TestTopic";
33+
private CCContainerizedKraftCluster _cluster;
3634

3735
/**
3836
* Setup the unit test.
3937
*/
4038
@Before
4139
public void setUp() {
42-
super.setUp();
40+
Properties adminClientProps = new Properties();
41+
setSecurityConfigs(adminClientProps, "admin");
42+
43+
_cluster = new CCContainerizedKraftCluster(2, buildBrokerConfigs(), adminClientProps);
44+
_cluster.start();
45+
_bootstrapUrl = _cluster.getExternalBootstrapAddress();
4346

4447
// creating the "TestTopic" explicitly because the topic auto-creation is disabled on the broker
4548
Properties adminProps = new Properties();
@@ -73,19 +76,26 @@ public void setUp() {
7376
assertEquals(0, producerFailed.get());
7477
}
7578

79+
/**
80+
* Tear down the unit test.
81+
*/
7682
@After
7783
public void tearDown() {
78-
super.tearDown();
84+
if (_cluster != null) {
85+
_cluster.close();
86+
}
7987
}
8088

8189
@Override
8290
public Properties overridingProps() {
8391
Properties props = new Properties();
84-
int port = CCKafkaTestUtils.findLocalPort();
8592
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
86-
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:" + port);
8793
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
88-
"127.0.0.1:" + port);
94+
"127.0.0.1:" + CCContainerizedKraftCluster.CONTAINER_INTERNAL_LISTENER_PORT);
95+
props.put("listener.security.protocol.map", String.join(",",
96+
CCContainerizedKraftCluster.CONTROLLER_LISTENER_NAME + ":PLAINTEXT",
97+
CCContainerizedKraftCluster.INTERNAL_LISTENER_NAME + ":PLAINTEXT",
98+
CCContainerizedKraftCluster.EXTERNAL_LISTENER_NAME + ":PLAINTEXT"));
8999
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
90100
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
91101
// configure metrics topic auto-creation by the metrics reporter
@@ -95,11 +105,10 @@ public Properties overridingProps() {
95105
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "1");
96106
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
97107
// disable topic auto-creation to leave the metrics reporter to create the metrics topic
98-
props.setProperty(ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false");
99-
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
100-
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
101-
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
102-
props.setProperty(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "2");
108+
props.setProperty(KafkaServerConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG, "false");
109+
props.setProperty(KafkaServerConfigs.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
110+
props.setProperty(KafkaServerConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
111+
props.setProperty(KafkaServerConfigs.NUM_PARTITIONS_CONFIG, "2");
103112
return props;
104113
}
105114

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Copyright 2020 LinkedIn Corp. Licensed under the BSD 2-Clause License (the "License"). See License in the project root for license information.
3+
*/
4+
package com.linkedin.kafka.cruisecontrol.metricsreporter.utils;
5+
6+
/**
7+
* Defines Kafka broker configuration keys to avoid relying on internal Kafka classes.
8+
* This reduces coupling to Kafka's internal APIs and improves long-term maintainability.
9+
*/
10+
public final class KafkaServerConfigs {
11+
public static final String AUTO_CREATE_TOPICS_ENABLE_CONFIG = "auto.create.topics.enable";
12+
public static final String OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG = "offsets.topic.replication.factor";
13+
public static final String DEFAULT_REPLICATION_FACTOR_CONFIG = "default.replication.factor";
14+
public static final String NUM_PARTITIONS_CONFIG = "num.partitions";
15+
16+
private KafkaServerConfigs() { }
17+
}
18+

0 commit comments

Comments
 (0)