Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ jobs:
build:
environment:
_JAVA_OPTIONS: "-Xms512m -Xmx1g"
TESTCONTAINERS_RYUK_DISABLED: "true"
working_directory: ~/workspace
docker:
- image: cimg/openjdk:17.0
steps:
- checkout
- setup_remote_docker:
docker_layer_caching: true
- restore_cache:
key: dependency-cache-{{ checksum "build.gradle" }}
- run:
Expand All @@ -19,7 +22,17 @@ jobs:
command: ./gradlew --max-workers=1 --no-daemon analyze
- run:
no_output_timeout: 15m
command: ./gradlew --no-daemon -PmaxParallelForks=1 build
command: |
Comment thread
kyguy marked this conversation as resolved.
export NETWORK_NAME="test_containers_network"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it mandatory to create and pass down NETWORK_NAME and CONTAINER_HOST?

If everything works fine without those two (in case the framework creates its own ephemeral resources), I'd personally prefer not having them for the sake of simplicity.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the comments in CCContainerizedKraftCluster kind of explained the situation. It's basically good to have for CI. But I'm still not sure if it's mandatory or just good to have.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, these variables are mandatory for allowing the CircleCI job to find/connect to the containers created by TestContainers. I have just added some comment/Javadocs to make this more clear. Let me know what you think!

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Makes sense. And the docs explain it well now. Thanks!


# Create a shared Docker network, required for communication between the CircleCI job container
# and TestContainers services.
docker network create $NETWORK_NAME

# Extract Docker host for helping CircleCI find TestContainer services.
export CONTAINER_HOST=$(docker network inspect "$NETWORK_NAME" --format='{{(index .IPAM.Config 0).Gateway}}')

./gradlew --no-daemon -PmaxParallelForks=1 build
- save_cache:
key: dependency-cache-{{ checksum "build.gradle" }}
paths:
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ project(':cruise-control-metrics-reporter') {
testImplementation "org.apache.kafka:kafka-raft:$kafkaVersion"
testImplementation "org.apache.kafka:kafka-storage:$kafkaVersion"
testImplementation 'commons-io:commons-io:2.11.0'
testImplementation "org.testcontainers:kafka:1.21.3"
testOutput sourceSets.test.output
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,13 @@
import java.io.File;
import java.io.IOException;
import java.util.Properties;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import javax.net.ssl.KeyManagerFactory;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCContainerizedKraftCluster;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.Assert;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG;


public class CruiseControlMetricsReporterSslTest extends CruiseControlMetricsReporterTest {

private File _trustStoreFile;
Expand All @@ -37,7 +30,6 @@ public CruiseControlMetricsReporterSslTest() {
@Override
public Properties overridingProps() {
Properties props = new Properties();
int port = CCKafkaTestUtils.findLocalPort();
// We need to convert all the properties to the Cruise Control properties.
setSecurityConfigs(props, "producer");
for (String configName : ProducerConfig.configNames()) {
Expand All @@ -47,23 +39,16 @@ public Properties overridingProps() {
props.put(appendPrefix(configName), value);
}
}
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), "127.0.0.1:" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG), SecurityProtocol.SSL.name);
props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
props.putAll(super.overridingProps());
props.put("listener.security.protocol.map", String.join(",",
CCContainerizedKraftCluster.CONTROLLER_LISTENER_NAME + ":PLAINTEXT",
CCContainerizedKraftCluster.INTERNAL_LISTENER_NAME + ":SSL",
CCContainerizedKraftCluster.EXTERNAL_LISTENER_NAME + ":SSL"));
// The Kafka brokers should use the same key manager algorithm as the host that generates the certs
props.setProperty(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, KeyManagerFactory.getDefaultAlgorithm());
Comment thread
kyguy marked this conversation as resolved.
return props;
}

@Override
public void testUpdatingMetricsTopicConfig() {
// Skip this test since it is flaky due to undetermined time to propagate metadata.
}

@Override
public File trustStoreFile() {
return _trustStoreFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,23 @@
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.KafkaTopicDescriptionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.MetricSerde;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCEmbeddedBroker;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCContainerizedKraftCluster;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaClientsIntegrationTestHarness;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaTestUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.Consumer;
Expand All @@ -34,36 +36,45 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.testcontainers.kafka.KafkaContainer;

import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_HOST;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.DEFAULT_BOOTSTRAP_SERVERS_PORT;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter.getTopicDescription;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG;
import static com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class CruiseControlMetricsReporterTest extends CCKafkaClientsIntegrationTestHarness {
private static final int NUM_OF_BROKERS = 2;
Comment thread
kyguy marked this conversation as resolved.
protected static final String TOPIC = "CruiseControlMetricsReporterTest";
private static final String HOST = "127.0.0.1";
protected static final String HOST = "127.0.0.1";
protected CCContainerizedKraftCluster _cluster;
protected List<Map<Object, Object>> _brokerConfigs;

/**
* Setup the unit test.
*/
@Before
public void setUp() {
super.setUp();
Comment thread
kyguy marked this conversation as resolved.
Properties adminClientProps = new Properties();
setSecurityConfigs(adminClientProps, "admin");

_brokerConfigs = buildBrokerConfigs();
_cluster = new CCContainerizedKraftCluster(NUM_OF_BROKERS, _brokerConfigs, adminClientProps);
_cluster.start();
_bootstrapUrl = _cluster.getExternalBootstrapAddress();

Properties props = new Properties();
props.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
AtomicInteger failed = new AtomicInteger(0);
Expand All @@ -82,23 +93,31 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
assertEquals(0, failed.get());
}

/**
* Tear down the unit test.
*/
@After
public void tearDown() {
super.tearDown();
if (_cluster != null) {
_cluster.close();
Comment thread
kyguy marked this conversation as resolved.
}
}

@Override
public Properties overridingProps() {
Properties props = new Properties();
int port = CCKafkaTestUtils.findLocalPort();
props.setProperty(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, CruiseControlMetricsReporter.class.getName());
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://" + HOST + ":" + port);
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), HOST + ":" + port);
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CruiseControlMetricsReporterConfig.CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty(ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG, "1");
props.setProperty(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.setProperty(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "2");
props.setProperty(CruiseControlMetricsReporterConfig.config(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
HOST + ":" + CCContainerizedKraftCluster.CONTAINER_INTERNAL_LISTENER_PORT);
props.put("listener.security.protocol.map", String.join(",",
CCContainerizedKraftCluster.CONTROLLER_LISTENER_NAME + ":PLAINTEXT",
CCContainerizedKraftCluster.INTERNAL_LISTENER_NAME + ":PLAINTEXT",
CCContainerizedKraftCluster.EXTERNAL_LISTENER_NAME + ":PLAINTEXT"));
props.setProperty(CRUISE_CONTROL_METRICS_REPORTER_INTERVAL_MS_CONFIG, "100");
props.setProperty(CRUISE_CONTROL_METRICS_TOPIC_CONFIG, TOPIC);
props.setProperty("log.flush.interval.messages", "1");
props.setProperty("offsets.topic.replication.factor", "1");
props.setProperty("default.replication.factor", "2");
return props;
}

Expand Down Expand Up @@ -187,73 +206,89 @@ public void testReportingMetrics() {
assertEquals("Expected " + expectedMetricTypes + ", but saw " + metricTypes, expectedMetricTypes, metricTypes);
}

private TopicDescription waitForTopicMetadata(Admin adminClient,
Duration timeout,
Predicate<TopicDescription> condition)
throws InterruptedException, TimeoutException {

long deadline = System.currentTimeMillis() + timeout.toMillis();

while (System.currentTimeMillis() < deadline) {
try {
TopicDescription topicDescription = getTopicDescription((AdminClient) adminClient, TOPIC);

if (condition.test(topicDescription)) {
return topicDescription;
}
} catch (KafkaTopicDescriptionException e) {
if (!(e.getCause() instanceof UnknownTopicOrPartitionException)) {
throw new RuntimeException("Failed to describe topic: " + TOPIC, e);
}
// else ignore and retry
}

Thread.sleep(500);
}

throw new TimeoutException("Timeout waiting for topic metadata condition to be met: " + TOPIC);
}

@Test
public void testUpdatingMetricsTopicConfig() throws ExecutionException, InterruptedException {
public void testUpdatingMetricsTopicConfig() throws InterruptedException, TimeoutException {
Properties props = new Properties();
setSecurityConfigs(props, "admin");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
AdminClient adminClient = AdminClient.create(props);
props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
Admin adminClient = Admin.create(props);

// For compatibility with Kafka 4.0 and beyond we must use new API methods.
TopicDescription topicDescription;
try {
topicDescription = getTopicDescription(adminClient, TOPIC);
} catch (KafkaTopicDescriptionException e) {
throw new RuntimeException(e);
}
TopicDescription topicDescription = waitForTopicMetadata(adminClient, Duration.ofSeconds(30), td -> true);
assertEquals(1, topicDescription.partitions().size());

KafkaContainer broker = _cluster.getBrokers().get(0);

// Shutdown broker
_brokers.get(0).shutdown();
broker.stop();

// Change broker config
Map<Object, Object> brokerConfig = buildBrokerConfigs().get(0);
Map<Object, Object> brokerConfig = _brokerConfigs.get(0);
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_AUTO_CREATE_CONFIG, "true");
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_NUM_PARTITIONS_CONFIG, "2");
brokerConfig.put(CRUISE_CONTROL_METRICS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
try (CCEmbeddedBroker broker = new CCEmbeddedBroker(brokerConfig)) {
// Restart broker
broker.startup();
// Check whether the topic config is updated
long startTime = System.currentTimeMillis();
boolean isTopicConfigChanged = false;
while (!isTopicConfigChanged) {
if (System.currentTimeMillis() > startTime + 60000) {
fail("Topic config was not updated");
}

TopicDescription description = adminClient.describeTopics(Collections.singleton(TOPIC)).topicNameValues().get(TOPIC).get();
isTopicConfigChanged = 2 == description.partitions().size();
_cluster.overrideBrokerConfig(broker, brokerConfig);

try {
Thread.sleep(5000);
} catch (InterruptedException ignored) {
}
}
}
// Restart broker
broker.start();

// Wait for topic metadata configuration change to propagate
int oldPartitionCount = topicDescription.partitions().size();
TopicDescription newTopicDescription = waitForTopicMetadata(adminClient, Duration.ofSeconds(30),
Comment thread
kyguy marked this conversation as resolved.
td -> td.partitions().size() != oldPartitionCount);

assertEquals(2, newTopicDescription.partitions().size());
}

@Test
public void testGetKafkaBootstrapServersConfigure() {
// Test with a "listeners" config with a host
Map<Object, Object> brokerConfig = buildBrokerConfigs().get(0);
Map<String, Object> listenersMap = Collections.singletonMap(
SocketServerConfigs.LISTENERS_CONFIG, brokerConfig.get(SocketServerConfigs.LISTENERS_CONFIG));
Map<String, Object> listenersMap = Collections.singletonMap("listeners", brokerConfig.get("listeners"));
String bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
String urlParse = "\\[?([0-9a-zA-Z\\-%._:]*)]?:(-?[0-9]+)";
Pattern urlParsePattern = Pattern.compile(urlParse);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
assertEquals(HOST, bootstrapServers.split(":")[0]);
assertEquals("localhost", bootstrapServers.split(":")[0]);

// Test with a "listeners" config without a host in the first listener.
String listeners = "SSL://:1234,PLAINTEXT://myhost:4321";
listenersMap = Collections.singletonMap(SocketServerConfigs.LISTENERS_CONFIG, listeners);
listenersMap = Collections.singletonMap("listeners", listeners);
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
assertEquals(DEFAULT_BOOTSTRAP_SERVERS_HOST, bootstrapServers.split(":")[0]);
assertEquals("1234", bootstrapServers.split(":")[1]);

// Test with "listeners" and "port" config together.
listenersMap = new HashMap<>();
listenersMap.put(SocketServerConfigs.LISTENERS_CONFIG, listeners);
listenersMap.put("listeners", listeners);
listenersMap.put("port", "43");
bootstrapServers = CruiseControlMetricsReporter.getBootstrapServers(listenersMap);
assertTrue(urlParsePattern.matcher(bootstrapServers).matches());
Expand Down
Loading
Loading