Skip to content

Commit 6afe767

Browse files
committed
Add integration test for MirrorMaker connectors
Signed-off-by: Mickael Maison <[email protected]>
1 parent cd810e6 commit 6afe767

File tree

4 files changed

+238
-24
lines changed

4 files changed

+238
-24
lines changed

client-metrics-reporter/src/main/java/io/strimzi/kafka/metrics/prometheus/kafka/KafkaCollector.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ public class KafkaCollector implements MetricsCollector {
3333
private static final Logger LOG = LoggerFactory.getLogger(KafkaCollector.class);
3434
private static final KafkaCollector INSTANCE = new KafkaCollector();
3535
private static final AtomicBoolean REGISTERED = new AtomicBoolean(false);
36+
private static final List<String> IGNORED_METRIC_NAMES = List.of(
37+
// The MirrorMaker connectors register this metric multiple times
38+
// See https://issues.apache.org/jira/browse/KAFKA-19168
39+
"kafka_connect_mirror_kafka_metrics_count_count"
40+
);
3641

3742
private final Set<AbstractReporter> reporters = ConcurrentHashMap.newKeySet();
3843

@@ -86,6 +91,9 @@ public List<MetricSnapshot> collect() {
8691
for (AbstractReporter reporter : reporters) {
8792
for (MetricWrapper metricWrapper : reporter.allowedMetrics()) {
8893
String prometheusMetricName = metricWrapper.prometheusName();
94+
if (IGNORED_METRIC_NAMES.contains(prometheusMetricName)) {
95+
continue;
96+
}
8997
Object metricValue = ((KafkaMetric) metricWrapper.metric()).metricValue();
9098
Labels labels = metricWrapper.labels();
9199
LOG.debug("Collecting Kafka metric {} with the following labels: {}", prometheusMetricName, labels);

client-metrics-reporter/src/test/java/io/strimzi/kafka/metrics/prometheus/MetricsUtils.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.prometheus.metrics.model.snapshots.InfoSnapshot;
99
import io.prometheus.metrics.model.snapshots.Labels;
1010
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
11+
import io.strimzi.test.container.StrimziConnectCluster;
1112
import org.junit.jupiter.api.function.ThrowingConsumer;
1213
import org.testcontainers.containers.GenericContainer;
1314
import org.testcontainers.containers.Network;
@@ -18,7 +19,11 @@
1819
import java.io.IOException;
1920
import java.io.InputStreamReader;
2021
import java.net.HttpURLConnection;
22+
import java.net.URI;
2123
import java.net.URL;
24+
import java.net.http.HttpClient;
25+
import java.net.http.HttpRequest;
26+
import java.net.http.HttpResponse;
2227
import java.time.Duration;
2328
import java.util.ArrayList;
2429
import java.util.List;
@@ -120,6 +125,13 @@ private static List<String> filterMetrics(List<String> allMetrics, Pattern patte
120125
return metrics;
121126
}
122127

128+
/**
129+
* Verify the container exposes metrics that match a condition
130+
* @param container the container to check
131+
* @param patterns the expected metrics patterns
132+
* @param port the port on which metrics are exposed
133+
* @param condition the assertion to execute on the metrics matching the patterns
134+
*/
123135
public static void verify(GenericContainer<?> container, List<String> patterns, int port, ThrowingConsumer<List<String>> condition) {
124136
assertTimeoutPreemptively(TIMEOUT, () -> {
125137
List<String> metrics = getMetrics(container.getHost(), container.getMappedPort(port));
@@ -140,6 +152,12 @@ public static void verify(GenericContainer<?> container, List<String> patterns,
140152
});
141153
}
142154

155+
/**
156+
* Start a test-clients container
157+
* @param env the environment variables
158+
* @param port the port to expose
159+
* @return the container instance
160+
*/
143161
public static GenericContainer<?> clientContainer(Map<String, String> env, int port) {
144162
return new GenericContainer<>(CLIENTS_IMAGE)
145163
.withNetwork(Network.SHARED)
@@ -149,4 +167,31 @@ public static GenericContainer<?> clientContainer(Map<String, String> env, int p
149167
.waitingFor(Wait.forHttp("/metrics").forStatusCode(200));
150168
}
151169

170+
/**
171+
* Start a connector
172+
* @param connect the Connect cluster
173+
* @param name the name of the connector
174+
* @param config the connector configuration
175+
*/
176+
public static void startConnector(StrimziConnectCluster connect, String name, String config) {
177+
assertTimeoutPreemptively(TIMEOUT, () -> {
178+
while (true) {
179+
HttpClient httpClient = HttpClient.newHttpClient();
180+
URI uri = new URI(connect.getRestEndpoint() + "/connectors/" + name + "/config");
181+
HttpRequest request = HttpRequest.newBuilder()
182+
.PUT(HttpRequest.BodyPublishers.ofString(config))
183+
.setHeader("Content-Type", "application/json")
184+
.uri(uri)
185+
.build();
186+
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
187+
try {
188+
assertEquals(HttpURLConnection.HTTP_CREATED, response.statusCode());
189+
break;
190+
} catch (Throwable t) {
191+
assertInstanceOf(AssertionError.class, t);
192+
TimeUnit.MILLISECONDS.sleep(100L);
193+
}
194+
}
195+
});
196+
}
152197
}

client-metrics-reporter/src/test/java/io/strimzi/kafka/metrics/prometheus/integration/TestConnectMetricsIT.java

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,12 @@
2424
import org.testcontainers.containers.GenericContainer;
2525
import org.testcontainers.utility.MountableFile;
2626

27-
import java.net.HttpURLConnection;
28-
import java.net.URI;
29-
import java.net.http.HttpClient;
30-
import java.net.http.HttpRequest;
31-
import java.net.http.HttpResponse;
3227
import java.util.ArrayList;
3328
import java.util.HashMap;
3429
import java.util.List;
3530
import java.util.Map;
3631

3732
import static io.strimzi.kafka.metrics.prometheus.ClientMetricsReporterConfig.ALLOWLIST_CONFIG;
38-
import static org.junit.jupiter.api.Assertions.assertEquals;
3933
import static org.junit.jupiter.api.Assertions.assertFalse;
4034
import static org.junit.jupiter.api.Assertions.assertTrue;
4135

@@ -158,7 +152,7 @@ private void setupConnect(Map<String, String> overrides) {
158152
}
159153

160154
@Test
161-
public void testConnectMetrics() throws Exception {
155+
public void testConnectMetrics() {
162156
setupConnect(Map.of(
163157
"consumer.metric.reporters", ClientMetricsReporter.class.getName(),
164158
"producer.metric.reporters", ClientMetricsReporter.class.getName(),
@@ -175,7 +169,7 @@ public void testConnectMetrics() throws Exception {
175169
" \"topics\": \"" + TOPIC + "\",\n" +
176170
" \"file\": \"" + FILE + "\"\n" +
177171
"}";
178-
httpPut("/connectors/" + SINK_CONNECTOR + "/config", connectorConfig);
172+
MetricsUtils.startConnector(connect, SINK_CONNECTOR, connectorConfig);
179173
checkMetricsExist(SINK_PATTERNS);
180174

181175
// Start a source connector metrics and check its metrics
@@ -185,12 +179,12 @@ public void testConnectMetrics() throws Exception {
185179
" \"topic\": \"" + TOPIC + "\",\n" +
186180
" \"file\": \"" + FILE + "\"\n" +
187181
"}";
188-
httpPut("/connectors/" + SOURCE_CONNECTOR + "/config", connectorConfig);
182+
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, connectorConfig);
189183
checkMetricsExist(SOURCE_PATTERNS);
190184
}
191185

192186
@Test
193-
public void testConnectMetricsWithAllowlist() throws Exception {
187+
public void testConnectMetricsWithAllowlist() {
194188
setupConnect(Map.of(
195189
"consumer.metric.reporters", ClientMetricsReporter.class.getName(),
196190
"producer.metric.reporters", ClientMetricsReporter.class.getName(),
@@ -223,7 +217,7 @@ public void testConnectMetricsWithAllowlist() throws Exception {
223217
" \"topics\": \"" + TOPIC + "\",\n" +
224218
" \"file\": \"" + FILE + "\"\n" +
225219
"}";
226-
httpPut("/connectors/" + SINK_CONNECTOR + "/config", connectorConfig);
220+
MetricsUtils.startConnector(connect, SINK_CONNECTOR, connectorConfig);
227221
List<String> allowedSinkPatterns = List.of(
228222
"kafka_connect_connector_metrics_.*" + SINK_CONNECTOR_PATTERN,
229223
"kafka_connect_connect_worker_metrics_connector_count 1.0",
@@ -241,7 +235,7 @@ public void testConnectMetricsWithAllowlist() throws Exception {
241235
" \"topic\": \"" + TOPIC + "\",\n" +
242236
" \"file\": \"" + FILE + "\"\n" +
243237
"}";
244-
httpPut("/connectors/" + SOURCE_CONNECTOR + "/config", connectorConfig);
238+
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, connectorConfig);
245239
List<String> allowedSourcePatterns = List.of(
246240
"kafka_connect_connector_metrics_.*" + SOURCE_CONNECTOR_PATTERN,
247241
"kafka_connect_connect_worker_metrics_connector_count 2.0",
@@ -264,16 +258,4 @@ private void checkMetricsDontExist(List<String> patterns) {
264258
MetricsUtils.verify(worker, patterns, PORT, metrics -> assertTrue(metrics.isEmpty()));
265259
}
266260
}
267-
268-
private void httpPut(String path, String body) throws Exception {
269-
HttpClient httpClient = HttpClient.newHttpClient();
270-
URI uri = new URI(connect.getRestEndpoint() + path);
271-
HttpRequest request = HttpRequest.newBuilder()
272-
.PUT(HttpRequest.BodyPublishers.ofString(body))
273-
.setHeader("Content-Type", "application/json")
274-
.uri(uri)
275-
.build();
276-
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
277-
assertEquals(HttpURLConnection.HTTP_CREATED, response.statusCode());
278-
}
279261
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.kafka.metrics.prometheus.integration;
6+
7+
import io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter;
8+
import io.strimzi.kafka.metrics.prometheus.ClientMetricsReporterConfig;
9+
import io.strimzi.kafka.metrics.prometheus.MetricsUtils;
10+
import io.strimzi.kafka.metrics.prometheus.http.Listener;
11+
import io.strimzi.test.container.StrimziConnectCluster;
12+
import io.strimzi.test.container.StrimziKafkaCluster;
13+
import org.apache.kafka.clients.CommonClientConfigs;
14+
import org.apache.kafka.clients.admin.Admin;
15+
import org.apache.kafka.clients.admin.AdminClientConfig;
16+
import org.apache.kafka.clients.admin.NewTopic;
17+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
18+
import org.apache.kafka.clients.producer.KafkaProducer;
19+
import org.apache.kafka.clients.producer.ProducerConfig;
20+
import org.apache.kafka.clients.producer.ProducerRecord;
21+
import org.apache.kafka.common.TopicPartition;
22+
import org.apache.kafka.common.serialization.StringSerializer;
23+
import org.junit.jupiter.api.AfterEach;
24+
import org.junit.jupiter.api.BeforeEach;
25+
import org.junit.jupiter.api.Test;
26+
import org.testcontainers.containers.GenericContainer;
27+
import org.testcontainers.utility.MountableFile;
28+
29+
import java.util.List;
30+
import java.util.Map;
31+
32+
import static org.junit.jupiter.api.Assertions.assertFalse;
33+
34+
public class TestMirrorMakerMetricsIT {
35+
36+
private static final int PORT = Listener.parseListener(ClientMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port;
37+
private static final String CONNECT_ID = "my-cluster";
38+
private static final String TOPIC = "input";
39+
private static final String GROUP = "my-group";
40+
private static final String SOURCE_CONNECTOR = "source";
41+
private static final String CHECKPOINT_CONNECTOR = "checkpoint";
42+
43+
private StrimziKafkaCluster source;
44+
private StrimziKafkaCluster target;
45+
private StrimziConnectCluster connect;
46+
47+
@BeforeEach
48+
public void setUp() throws Exception {
49+
source = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
50+
.withNumberOfBrokers(1)
51+
.withSharedNetwork()
52+
.build();
53+
source.start();
54+
55+
target = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
56+
.withNumberOfBrokers(1)
57+
.withSharedNetwork()
58+
.build();
59+
target.start();
60+
61+
connect = new StrimziConnectCluster.StrimziConnectClusterBuilder()
62+
.withGroupId(CONNECT_ID)
63+
.withKafkaCluster(source)
64+
.withAdditionalConnectConfiguration(Map.of(
65+
CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName()
66+
))
67+
.build();
68+
for (GenericContainer<?> worker : connect.getWorkers()) {
69+
worker.withCopyFileToContainer(MountableFile.forHostPath(MetricsUtils.REPORTER_JARS), MetricsUtils.MOUNT_PATH)
70+
.withExposedPorts(8083, PORT)
71+
.withEnv(Map.of("CLASSPATH", MetricsUtils.MOUNT_PATH + "*"));
72+
}
73+
connect.start();
74+
75+
try (Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, source.getBootstrapServers()))) {
76+
// Create a topic with 2 partitions so we get 2 MirrorSourceConnector tasks
77+
admin.createTopics(List.of(new NewTopic(TOPIC, 2, (short) -1))).all().get();
78+
// Create 2 consumer groups so we get 2 MirrorCheckpointConnector tasks
79+
admin.alterConsumerGroupOffsets(GROUP, Map.of(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(1))).all().get();
80+
admin.alterConsumerGroupOffsets(GROUP + "-2", Map.of(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(1))).all().get();
81+
}
82+
try (KafkaProducer<String, String> producer = new KafkaProducer<>(Map.of(
83+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, source.getBootstrapServers(),
84+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
85+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()
86+
))) {
87+
for (int i = 0; i < 5; i++) {
88+
producer.send(new ProducerRecord<>(TOPIC, i % 2, null, "record" + i));
89+
}
90+
}
91+
}
92+
93+
@AfterEach
94+
public void tearDown() {
95+
if (connect != null) {
96+
connect.stop();
97+
}
98+
if (source != null) {
99+
source.stop();
100+
}
101+
if (target != null) {
102+
target.stop();
103+
}
104+
}
105+
106+
@Test
107+
public void testMirrorMakerConnectorMetrics() {
108+
// Start MirrorSourceConnector and check its metrics
109+
String sourceTags = ".*partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*";
110+
List<String> sourceMetricsPatterns = List.of(
111+
"kafka_connect_mirror_mirrorsourceconnector_byte_count" + sourceTags,
112+
"kafka_connect_mirror_mirrorsourceconnector_byte_rate" + sourceTags,
113+
"kafka_connect_mirror_mirrorsourceconnector_record_age_ms" + sourceTags,
114+
"kafka_connect_mirror_mirrorsourceconnector_record_age_ms_avg" + sourceTags,
115+
"kafka_connect_mirror_mirrorsourceconnector_record_age_ms_max" + sourceTags,
116+
"kafka_connect_mirror_mirrorsourceconnector_record_age_ms_min" + sourceTags,
117+
"kafka_connect_mirror_mirrorsourceconnector_record_count" + sourceTags,
118+
"kafka_connect_mirror_mirrorsourceconnector_record_rate" + sourceTags,
119+
"kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms" + sourceTags,
120+
"kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms_avg" + sourceTags,
121+
"kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms_max" + sourceTags,
122+
"kafka_connect_mirror_mirrorsourceconnector_replication_latency_ms_min" + sourceTags
123+
);
124+
String sourceConfig =
125+
"{\n" +
126+
" \"name\": \"" + SOURCE_CONNECTOR + "\",\n" +
127+
" \"connector.class\": \"org.apache.kafka.connect.mirror.MirrorSourceConnector\",\n" +
128+
" \"tasks.max\": \"10\",\n" +
129+
" \"key.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n" +
130+
" \"value.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n" +
131+
" \"source.cluster.alias\": \"source\",\n" +
132+
" \"target.cluster.alias\": \"target\",\n" +
133+
" \"source.cluster.bootstrap.servers\": \"" + source.getNetworkBootstrapServers() + "\",\n" +
134+
" \"target.cluster.bootstrap.servers\": \"" + target.getNetworkBootstrapServers() + "\",\n" +
135+
" \"replication.factor\": \"-1\",\n" +
136+
" \"offset-syncs.topic.replication.factor\": \"-1\",\n" +
137+
" \"refresh.topics.interval.seconds\": \"1\",\n" +
138+
" \"topics\": \"" + TOPIC + "\",\n" +
139+
" \"metric.reporters\": \"" + ClientMetricsReporter.class.getName() + "\",\n" +
140+
" \"prometheus.metrics.reporter.listener.enable\": \"false\"" +
141+
"}";
142+
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, sourceConfig);
143+
checkMetricsExist(sourceMetricsPatterns);
144+
145+
// Start MirrorCheckpointConnector and check its metrics
146+
String checkpointTags = ".*group=\".*\",partition=\"\\d+\",source=\"source\",target=\"target\",topic=\"source.input\".*";
147+
List<String> checkpointMetricPatterns = List.of(
148+
"kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms" + checkpointTags,
149+
"kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms_avg" + checkpointTags,
150+
"kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms_max" + checkpointTags,
151+
"kafka_connect_mirror_mirrorcheckpointconnector_checkpoint_latency_ms_min" + checkpointTags
152+
);
153+
String checkpointConfig =
154+
"{\n" +
155+
" \"name\": \"" + CHECKPOINT_CONNECTOR + "\",\n" +
156+
" \"connector.class\": \"org.apache.kafka.connect.mirror.MirrorCheckpointConnector\",\n" +
157+
" \"tasks.max\": \"10\",\n" +
158+
" \"key.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n" +
159+
" \"value.converter\": \"org.apache.kafka.connect.converters.ByteArrayConverter\",\n" +
160+
" \"source.cluster.alias\": \"source\",\n" +
161+
" \"target.cluster.alias\": \"target\",\n" +
162+
" \"source.cluster.bootstrap.servers\": \"" + source.getNetworkBootstrapServers() + "\",\n" +
163+
" \"target.cluster.bootstrap.servers\": \"" + target.getNetworkBootstrapServers() + "\",\n" +
164+
" \"checkpoints.topic.replication.factor\": \"-1\",\n" +
165+
" \"emit.checkpoints.interval.seconds\": \"1\",\n" +
166+
" \"refresh.groups.interval.seconds\": \"1\",\n" +
167+
" \"metric.reporters\": \"" + ClientMetricsReporter.class.getName() + "\",\n" +
168+
" \"prometheus.metrics.reporter.listener.enable\": \"false\"" +
169+
"}";
170+
MetricsUtils.startConnector(connect, CHECKPOINT_CONNECTOR, checkpointConfig);
171+
checkMetricsExist(checkpointMetricPatterns);
172+
}
173+
174+
private void checkMetricsExist(List<String> patterns) {
175+
for (GenericContainer<?> worker : connect.getWorkers()) {
176+
MetricsUtils.verify(worker, patterns, PORT, metrics -> assertFalse(metrics.isEmpty()));
177+
}
178+
}
179+
}

0 commit comments

Comments
 (0)