Skip to content

Add integration test for MirrorMaker connectors #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public class KafkaCollector implements MetricsCollector {
private static final Logger LOG = LoggerFactory.getLogger(KafkaCollector.class);
private static final KafkaCollector INSTANCE = new KafkaCollector();
private static final AtomicBoolean REGISTERED = new AtomicBoolean(false);
private static final List<String> IGNORED_METRIC_NAMES = List.of(
// The MirrorMaker connectors register this metric multiple times
// See https://issues.apache.org/jira/browse/KAFKA-19168
"kafka_connect_mirror_kafka_metrics_count_count"
);

private final Set<AbstractReporter> reporters = ConcurrentHashMap.newKeySet();

Expand Down Expand Up @@ -86,6 +91,9 @@ public List<MetricSnapshot> collect() {
for (AbstractReporter reporter : reporters) {
for (MetricWrapper metricWrapper : reporter.allowedMetrics()) {
String prometheusMetricName = metricWrapper.prometheusName();
if (IGNORED_METRIC_NAMES.contains(prometheusMetricName)) {
continue;
}
Object metricValue = ((KafkaMetric) metricWrapper.metric()).metricValue();
Labels labels = metricWrapper.labels();
LOG.debug("Collecting Kafka metric {} with the following labels: {}", prometheusMetricName, labels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
* Utility class to create and retrieve metrics
Expand All @@ -43,7 +44,7 @@ public class MetricsUtils {

public static final String VERSION = "1.0.0-SNAPSHOT";
private static final String CLIENTS_IMAGE = "quay.io/strimzi-test-clients/test-clients:latest-kafka-3.9.0";
private static final Duration TIMEOUT = Duration.ofSeconds(10L);
private static final Duration TIMEOUT = Duration.ofSeconds(30L);

public static final String REPORTER_JARS = "target/client-metrics-reporter-" + VERSION + "/client-metrics-reporter-" + VERSION + "/libs/";
public static final String MOUNT_PATH = "/opt/strimzi/metrics-reporter/";
Expand Down Expand Up @@ -125,6 +126,13 @@ private static List<String> filterMetrics(List<String> allMetrics, Pattern patte
return metrics;
}

/**
* Verify the container exposes metrics that match a condition
* @param container the container to check
* @param patterns the expected metrics patterns
* @param port the port on which metrics are exposed
* @param condition the assertion to execute on the metrics matching the patterns
*/
public static void verify(GenericContainer<?> container, List<String> patterns, int port, ThrowingConsumer<List<String>> condition) {
assertTimeoutPreemptively(TIMEOUT, () -> {
List<String> metrics = getMetrics(container.getHost(), container.getMappedPort(port));
Expand All @@ -145,6 +153,12 @@ public static void verify(GenericContainer<?> container, List<String> patterns,
});
}

/**
* Start a test-clients container
* @param env the environment variables
* @param port the port to expose
* @return the container instance
*/
public static GenericContainer<?> clientContainer(Map<String, String> env, int port) {
return new GenericContainer<>(CLIENTS_IMAGE)
.withNetwork(Network.SHARED)
Expand All @@ -155,15 +169,17 @@ public static GenericContainer<?> clientContainer(Map<String, String> env, int p
}

/**
* Start a connector
* Start a connector and ensure its tasks are running
* @param connect the Connect cluster
* @param name the name of the connector
* @param config the connector configuration
* @param expectedTasks the number of tasks
*/
public static void startConnector(StrimziConnectCluster connect, String name, String config) {
public static void startConnector(StrimziConnectCluster connect, String name, String config, int expectedTasks) {
assertTimeoutPreemptively(TIMEOUT, () -> {
HttpClient httpClient = HttpClient.newHttpClient();
// Wait for the connector creation to succeed
while (true) {
HttpClient httpClient = HttpClient.newHttpClient();
URI uri = new URI(connect.getRestEndpoint() + "/connectors/" + name + "/config");
HttpRequest request = HttpRequest.newBuilder()
.PUT(HttpRequest.BodyPublishers.ofString(config))
Expand All @@ -179,7 +195,25 @@ public static void startConnector(StrimziConnectCluster connect, String name, St
TimeUnit.MILLISECONDS.sleep(100L);
}
}

// Wait for the connector's tasks to be in RUNNING state
while (true) {
URI uri = new URI(connect.getRestEndpoint() + "/connectors/" + name + "/status");
HttpRequest request = HttpRequest.newBuilder()
.uri(uri)
.build();
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
try {
assertEquals(HttpURLConnection.HTTP_OK, response.statusCode());
for (int taskId = 0; taskId < expectedTasks; taskId++) {
assertTrue(response.body().contains("{\"id\":" + taskId + ",\"state\":\"RUNNING\""));
}
break;
} catch (Throwable t) {
assertInstanceOf(AssertionError.class, t);
TimeUnit.MILLISECONDS.sleep(100L);
}
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.MountableFile;

import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -146,7 +148,10 @@ private void setupConnect(Map<String, String> overrides) {
for (GenericContainer<?> worker : connect.getWorkers()) {
worker.withCopyFileToContainer(MountableFile.forHostPath(MetricsUtils.REPORTER_JARS), MetricsUtils.MOUNT_PATH)
.withExposedPorts(8083, PORT)
.withEnv(Map.of("CLASSPATH", MetricsUtils.MOUNT_PATH + "*"));
.withEnv(Map.of("CLASSPATH", MetricsUtils.MOUNT_PATH + "*"))
.waitingFor(new HttpWaitStrategy()
.forPath("/health")
.forStatusCode(HttpURLConnection.HTTP_OK));
}
connect.start();
}
Expand All @@ -169,7 +174,7 @@ public void testConnectMetrics() {
" \"topics\": \"" + TOPIC + "\",\n" +
" \"file\": \"" + FILE + "\"\n" +
"}";
MetricsUtils.startConnector(connect, SINK_CONNECTOR, connectorConfig);
MetricsUtils.startConnector(connect, SINK_CONNECTOR, connectorConfig, 1);
checkMetricsExist(SINK_PATTERNS);

// Start a source connector metrics and check its metrics
Expand All @@ -179,7 +184,7 @@ public void testConnectMetrics() {
" \"topic\": \"" + TOPIC + "\",\n" +
" \"file\": \"" + FILE + "\"\n" +
"}";
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, connectorConfig);
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, connectorConfig, 1);
checkMetricsExist(SOURCE_PATTERNS);
}

Expand Down Expand Up @@ -217,7 +222,7 @@ public void testConnectMetricsWithAllowlist() {
" \"topics\": \"" + TOPIC + "\",\n" +
" \"file\": \"" + FILE + "\"\n" +
"}";
MetricsUtils.startConnector(connect, SINK_CONNECTOR, connectorConfig);
MetricsUtils.startConnector(connect, SINK_CONNECTOR, connectorConfig, 1);
List<String> allowedSinkPatterns = List.of(
"kafka_connect_connector_metrics_.*" + SINK_CONNECTOR_PATTERN,
"kafka_connect_connect_worker_metrics_connector_count 1.0",
Expand All @@ -235,7 +240,7 @@ public void testConnectMetricsWithAllowlist() {
" \"topic\": \"" + TOPIC + "\",\n" +
" \"file\": \"" + FILE + "\"\n" +
"}";
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, connectorConfig);
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, connectorConfig, 1);
List<String> allowedSourcePatterns = List.of(
"kafka_connect_connector_metrics_.*" + SOURCE_CONNECTOR_PATTERN,
"kafka_connect_connect_worker_metrics_connector_count 2.0",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics.prometheus.integration;

import io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter;
import io.strimzi.kafka.metrics.prometheus.ClientMetricsReporterConfig;
import io.strimzi.kafka.metrics.prometheus.MetricsUtils;
import io.strimzi.kafka.metrics.prometheus.http.Listener;
import io.strimzi.test.container.StrimziConnectCluster;
import io.strimzi.test.container.StrimziKafkaCluster;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.MountableFile;

import java.net.HttpURLConnection;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertFalse;

public class TestMirrorMakerMetricsIT {

private static final int PORT = Listener.parseListener(ClientMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port;
private static final String CONNECT_ID = "my-cluster";
private static final String TOPIC = "input";
private static final String GROUP = "my-group";
private static final String SOURCE_CONNECTOR = "source";
private static final String CHECKPOINT_CONNECTOR = "checkpoint";

private StrimziKafkaCluster kafka;
private StrimziConnectCluster connect;

@BeforeEach
public void setUp() throws Exception {
// Use a single cluster as source and target
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, when you also used the target Kafka cluster, it didn't work, I assume (why?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest I don't really know. I'm wondering if it may have been a resources issue. It seems out of the 2 pipelines (Java 11 and 17) each time a random one would pass and the other fail. Since we don't effectively need 2 Kafka clusters, I opted to use a single one, that also simplified the test slightly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it may have been a resources issue.

I think that's not a problem for sure. As we have in strimzi-kafka-repository tests (specifically in STs), which run much more than this.

It seems out of the 2 pipelines (Java 11 and 17) each time a random one would pass and the other fail. Since we don't effectively need 2 Kafka clusters, I opted to use a single one, that also simplified the test slightly.

But locally, it was okay, I assume?


I was just curious, if that is okay with you, I am fine with that (to have just one Kafka cluster as source/target).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run this test 100s of times locally and it was consistently passing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note also the failures we saw in the CI were always in the test setup, the MirrorMakerSource connector tasks were not starting in time. It was not a failure to collect and emit metrics in the reporter.

// MirrorSourceConnector is configured with a fixed topics configuration to avoid loop
kafka = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
.withNumberOfBrokers(1)
.withSharedNetwork()
.build();
kafka.start();

connect = new StrimziConnectCluster.StrimziConnectClusterBuilder()
.withGroupId(CONNECT_ID)
.withKafkaCluster(kafka)
.withAdditionalConnectConfiguration(Map.of(
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName()
))
.build();

for (GenericContainer<?> worker : connect.getWorkers()) {
worker.withCopyFileToContainer(MountableFile.forHostPath(MetricsUtils.REPORTER_JARS), MetricsUtils.MOUNT_PATH)
.withExposedPorts(8083, PORT)
.withEnv(Map.of("CLASSPATH", MetricsUtils.MOUNT_PATH + "*"))
.waitingFor(new HttpWaitStrategy()
.forPath("/health")
.forStatusCode(HttpURLConnection.HTTP_OK));
}
connect.start();

try (Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()))) {
// Create a topic with 2 partitions so we get 2 MirrorSourceConnector tasks
admin.createTopics(List.of(new NewTopic(TOPIC, 2, (short) -1))).all().get();
// Create 2 consumer groups so we get 2 MirrorCheckpointConnector tasks
admin.alterConsumerGroupOffsets(GROUP, Map.of(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(1))).all().get();
admin.alterConsumerGroupOffsets(GROUP + "-2", Map.of(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(1))).all().get();
}
try (KafkaProducer<String, String> producer = new KafkaProducer<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()
))) {
for (int i = 0; i < 5; i++) {
producer.send(new ProducerRecord<>(TOPIC, i % 2, null, "record" + i));
}
}
}

@AfterEach
public void tearDown() {
if (connect != null) {
connect.stop();
}
if (kafka != null) {
kafka.stop();
}
}

@Test
public void testMirrorMakerConnectorMetrics() {
// Start MirrorSourceConnector and check its metrics
String sourceTags = ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*";
List<String> sourceMetricsPatterns = List.of(
"kafka_connect_mirror_mirrorsourceconnector_byte_count" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_byte_rate" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_record_age_ms" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_record_age_ms_avg" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_record_age_ms_max" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_record_age_ms_min" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_record_count" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_record_rate" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms_avg" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms_max" + sourceTags,
"kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms_min" + sourceTags
);
String sourceConfig =
"{\n" +
" \"name\": \"" + SOURCE_CONNECTOR + "\",\n" +
" \"connector.class\": \"org.apache.kafka.connect.mirror.MirrorSourceConnector\",\n" +
" \"tasks.max\": \"2\",\n" +
" \"key.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n" +
" \"value.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n" +
" \"source.cluster.alias\": \"source\",\n" +
" \"target.cluster.alias\": \"target\",\n" +
" \"source.cluster.bootstrap.servers\": \"" + kafka.getNetworkBootstrapServers() + "\",\n" +
" \"target.cluster.bootstrap.servers\": \"" + kafka.getNetworkBootstrapServers() + "\",\n" +
" \"replication.factor\": \"-1\",\n" +
" \"offset-syncs.topic.replication.factor\": \"-1\",\n" +
" \"refresh.topics.interval.seconds\": \"1\",\n" +
" \"topics\": \"" + TOPIC + "\",\n" +
" \"metric.reporters\": \"" + ClientMetricsReporter.class.getName() + "\",\n" +
" \"prometheus.metrics.reporter.listener.enable\": \"false\"" +
"}";
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, sourceConfig, 2);
checkMetricsExist(sourceMetricsPatterns);

// Start MirrorCheckpointConnector and check its metrics
String checkpointTags = ".*group=\".*\",partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*";
List<String> checkpointMetricPatterns = List.of(
"kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms" + checkpointTags,
"kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms_avg" + checkpointTags,
"kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms_max" + checkpointTags,
"kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms_min" + checkpointTags
);
String checkpointConfig =
"{\n" +
" \"name\": \"" + CHECKPOINT_CONNECTOR + "\",\n" +
" \"connector.class\": \"org.apache.kafka.connect.mirror.MirrorCheckpointConnector\",\n" +
" \"tasks.max\": \"2\",\n" +
" \"key.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n" +
" \"value.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n" +
" \"source.cluster.alias\": \"source\",\n" +
" \"target.cluster.alias\": \"target\",\n" +
" \"source.cluster.bootstrap.servers\": \"" + kafka.getNetworkBootstrapServers() + "\",\n" +
" \"target.cluster.bootstrap.servers\": \"" + kafka.getNetworkBootstrapServers() + "\",\n" +
" \"checkpoints.topic.replication.factor\": \"-1\",\n" +
" \"emit.checkpoints.interval.seconds\": \"1\",\n" +
" \"refresh.groups.interval.seconds\": \"1\",\n" +
" \"metric.reporters\": \"" + ClientMetricsReporter.class.getName() + "\",\n" +
" \"prometheus.metrics.reporter.listener.enable\": \"false\"" +
"}";
MetricsUtils.startConnector(connect, CHECKPOINT_CONNECTOR, checkpointConfig, 2);
checkMetricsExist(checkpointMetricPatterns);
}

private void checkMetricsExist(List<String> patterns) {
for (GenericContainer<?> worker : connect.getWorkers()) {
MetricsUtils.verify(worker, patterns, PORT, metrics -> assertFalse(metrics.isEmpty()));
}
}
}