Skip to content

Commit cd810e6

Browse files
authored
Add integration test for Connect metrics (#81)
Signed-off-by: Mickael Maison <[email protected]>
1 parent cd42664 commit cd810e6

File tree

6 files changed

+457
-199
lines changed

6 files changed

+457
-199
lines changed

client-metrics-reporter/src/test/java/io/strimzi/kafka/metrics/prometheus/MetricsUtils.java

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.testcontainers.utility.MountableFile;
1616

1717
import java.io.BufferedReader;
18+
import java.io.IOException;
1819
import java.io.InputStreamReader;
1920
import java.net.HttpURLConnection;
2021
import java.net.URL;
@@ -23,6 +24,8 @@
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.concurrent.TimeUnit;
27+
import java.util.regex.Pattern;
28+
import java.util.stream.Collectors;
2629

2730
import static org.junit.jupiter.api.Assertions.assertEquals;
2831
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -44,9 +47,8 @@ public class MetricsUtils {
4447
* Query the HTTP endpoint and returns the output
4548
* @param port The port to query
4649
* @return The lines from the output
47-
* @throws Exception If any error occurs
4850
*/
49-
public static List<String> getMetrics(int port) throws Exception {
51+
public static List<String> getMetrics(int port) {
5052
return getMetrics("localhost", port);
5153
}
5254

@@ -55,22 +57,26 @@ public static List<String> getMetrics(int port) throws Exception {
5557
* @param host The host to query
5658
* @param port The port to query
5759
* @return The lines from the output
58-
* @throws Exception If any error occurs
5960
*/
60-
public static List<String> getMetrics(String host, int port) throws Exception {
61+
public static List<String> getMetrics(String host, int port) {
6162
List<String> metrics = new ArrayList<>();
62-
URL url = new URL("http://" + host + ":" + port + "/metrics");
63-
HttpURLConnection con = (HttpURLConnection) url.openConnection();
64-
con.setRequestMethod("GET");
65-
66-
try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()))) {
67-
String inputLine;
68-
while ((inputLine = in.readLine()) != null) {
69-
if (!inputLine.startsWith("#")) {
70-
metrics.add(inputLine);
63+
assertTimeoutPreemptively(TIMEOUT, () -> {
64+
try {
65+
URL url = new URL("http://" + host + ":" + port + "/metrics");
66+
HttpURLConnection con = (HttpURLConnection) url.openConnection();
67+
con.setRequestMethod("GET");
68+
try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()))) {
69+
String inputLine;
70+
while ((inputLine = in.readLine()) != null) {
71+
if (!inputLine.startsWith("#")) {
72+
metrics.add(inputLine);
73+
}
74+
}
7175
}
76+
} catch (IOException e) {
77+
// swallow
7278
}
73-
}
79+
});
7480
return metrics;
7581
}
7682

@@ -104,32 +110,31 @@ public static void assertInfoSnapshot(MetricSnapshot snapshot, Labels labels, St
104110
assertEquals(expectedLabels, infoSnapshot.getDataPoints().get(0).getLabels());
105111
}
106112

107-
/**
108-
* Filter metrics that start with a specified prefix
109-
* @param allMetrics all the metric names
110-
* @param prefix the prefix
111-
* @return the list of metric names that start with the prefix
112-
*/
113-
public static List<String> filterMetrics(List<String> allMetrics, String prefix) {
113+
private static List<String> filterMetrics(List<String> allMetrics, Pattern pattern) {
114114
List<String> metrics = new ArrayList<>();
115115
for (String metric : allMetrics) {
116-
if (metric.startsWith(prefix)) {
116+
if (pattern.matcher(metric).matches()) {
117117
metrics.add(metric);
118118
}
119119
}
120120
return metrics;
121121
}
122122

123-
public static void verify(GenericContainer<?> container, String prefix, int port, ThrowingConsumer<List<String>> condition) {
123+
public static void verify(GenericContainer<?> container, List<String> patterns, int port, ThrowingConsumer<List<String>> condition) {
124124
assertTimeoutPreemptively(TIMEOUT, () -> {
125-
while (true) {
126-
try {
127-
List<String> filteredMetrics = filterMetrics(getMetrics(container.getHost(), container.getMappedPort(port)), prefix);
128-
condition.accept(filteredMetrics);
129-
return;
130-
} catch (Throwable t) {
131-
assertInstanceOf(AssertionError.class, t);
132-
TimeUnit.MILLISECONDS.sleep(100L);
125+
List<String> metrics = getMetrics(container.getHost(), container.getMappedPort(port));
126+
List<Pattern> expectedPatterns = patterns.stream().map(Pattern::compile).collect(Collectors.toList());
127+
for (Pattern pattern : expectedPatterns) {
128+
while (true) {
129+
try {
130+
List<String> filteredMetrics = filterMetrics(metrics, pattern);
131+
condition.accept(filteredMetrics);
132+
break;
133+
} catch (Throwable t) {
134+
assertInstanceOf(AssertionError.class, t);
135+
TimeUnit.MILLISECONDS.sleep(100L);
136+
metrics = getMetrics(container.getHost(), container.getMappedPort(port));
137+
}
133138
}
134139
}
135140
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
/*
2+
* Copyright Strimzi authors.
3+
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4+
*/
5+
package io.strimzi.kafka.metrics.prometheus.integration;
6+
7+
import io.strimzi.kafka.metrics.prometheus.ClientMetricsReporter;
8+
import io.strimzi.kafka.metrics.prometheus.ClientMetricsReporterConfig;
9+
import io.strimzi.kafka.metrics.prometheus.MetricsUtils;
10+
import io.strimzi.kafka.metrics.prometheus.http.Listener;
11+
import io.strimzi.test.container.StrimziConnectCluster;
12+
import io.strimzi.test.container.StrimziKafkaCluster;
13+
import org.apache.kafka.clients.CommonClientConfigs;
14+
import org.apache.kafka.clients.admin.Admin;
15+
import org.apache.kafka.clients.admin.AdminClientConfig;
16+
import org.apache.kafka.clients.admin.NewTopic;
17+
import org.apache.kafka.clients.producer.KafkaProducer;
18+
import org.apache.kafka.clients.producer.ProducerConfig;
19+
import org.apache.kafka.clients.producer.ProducerRecord;
20+
import org.apache.kafka.common.serialization.StringSerializer;
21+
import org.junit.jupiter.api.AfterEach;
22+
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Test;
24+
import org.testcontainers.containers.GenericContainer;
25+
import org.testcontainers.utility.MountableFile;
26+
27+
import java.net.HttpURLConnection;
28+
import java.net.URI;
29+
import java.net.http.HttpClient;
30+
import java.net.http.HttpRequest;
31+
import java.net.http.HttpResponse;
32+
import java.util.ArrayList;
33+
import java.util.HashMap;
34+
import java.util.List;
35+
import java.util.Map;
36+
37+
import static io.strimzi.kafka.metrics.prometheus.ClientMetricsReporterConfig.ALLOWLIST_CONFIG;
38+
import static org.junit.jupiter.api.Assertions.assertEquals;
39+
import static org.junit.jupiter.api.Assertions.assertFalse;
40+
import static org.junit.jupiter.api.Assertions.assertTrue;
41+
42+
public class TestConnectMetricsIT {
43+
44+
private static final int PORT = Listener.parseListener(ClientMetricsReporterConfig.LISTENER_CONFIG_DEFAULT).port;
45+
private static final String GROUP_ID = "my-cluster";
46+
private static final String CLIENT_IDS_PATTERN = "client_id=\"" + GROUP_ID + "-(configs|offsets|statuses)\".*";
47+
private static final String ADMIN_ID_PATTERN = "client_id=\"" + GROUP_ID + "-shared-admin\".*";
48+
private static final String CONNECT_ID_PATTERN = "connect.*";
49+
private static final String TOPIC = "topic-to-export";
50+
private static final String FILE = "/tmp/file";
51+
private static final String SINK_CONNECTOR = "file-sink";
52+
private static final String SINK_CONNECTOR_PATTERN = "connector=\"" + SINK_CONNECTOR + "\".*";
53+
private static final String SINK_CONSUMER_ID = "client_id=\"connector-consumer-" + SINK_CONNECTOR + ".*";
54+
private static final String SOURCE_CONNECTOR = "file-source";
55+
private static final String SOURCE_CONNECTOR_PATTERN = "connector=\"" + SOURCE_CONNECTOR + "\".*";
56+
private static final String SOURCE_PRODUCER_ID = "client_id=\"connector-producer-" + SOURCE_CONNECTOR + ".*";
57+
58+
private static final List<String> CONNECT_PATTERNS = List.of(
59+
"jvm_.*",
60+
"process_.*",
61+
"kafka_admin_client_app_info_.*" + ADMIN_ID_PATTERN,
62+
"kafka_admin_client_kafka_metrics_.*" + ADMIN_ID_PATTERN,
63+
"kafka_admin_client_admin_client_metrics_.*" + ADMIN_ID_PATTERN,
64+
"kafka_admin_client_admin_client_node_metrics_.*" + ADMIN_ID_PATTERN,
65+
"kafka_consumer_app_info_.*" + CLIENT_IDS_PATTERN,
66+
"kafka_consumer_kafka_metrics_.*" + CLIENT_IDS_PATTERN,
67+
"kafka_consumer_consumer_metrics_.*" + CLIENT_IDS_PATTERN,
68+
"kafka_consumer_consumer_node_metrics_.*" + CLIENT_IDS_PATTERN,
69+
"kafka_consumer_consumer_coordinator_metrics_.*" + CLIENT_IDS_PATTERN,
70+
"kafka_consumer_consumer_fetch_manager_metrics_.*" + CLIENT_IDS_PATTERN,
71+
"kafka_producer_app_info_.*" + CLIENT_IDS_PATTERN,
72+
"kafka_producer_kafka_metrics_.*" + CLIENT_IDS_PATTERN,
73+
"kafka_producer_producer_metrics_.*" + CLIENT_IDS_PATTERN,
74+
"kafka_producer_producer_node_metrics_.*" + CLIENT_IDS_PATTERN,
75+
"kafka_producer_producer_topic_metrics_.*" + CLIENT_IDS_PATTERN,
76+
"kafka_connect_app_info_.*" + CONNECT_ID_PATTERN,
77+
"kafka_connect_connect_coordinator_metrics_.*" + CONNECT_ID_PATTERN,
78+
"kafka_connect_connect_metrics_.*" + CONNECT_ID_PATTERN,
79+
"kafka_connect_connect_node_metrics_.*" + CONNECT_ID_PATTERN,
80+
"kafka_connect_connect_worker_metrics_.*" + CONNECT_ID_PATTERN,
81+
"kafka_connect_connect_worker_rebalance_metrics_.*" + CONNECT_ID_PATTERN,
82+
"kafka_connect_kafka_metrics_.*" + CONNECT_ID_PATTERN);
83+
84+
private static final List<String> SINK_PATTERNS = List.of(
85+
"kafka_connect_connector_metrics_.*" + SINK_CONNECTOR_PATTERN,
86+
"kafka_connect_connector_task_metrics_.*" + SINK_CONNECTOR_PATTERN,
87+
"kafka_connect_sink_task_metrics_.*" + SINK_CONNECTOR_PATTERN,
88+
"kafka_connect_task_error_metrics_.*" + SINK_CONNECTOR_PATTERN,
89+
"kafka_connect_connect_worker_metrics_connector_count 1.0",
90+
"kafka_consumer_app_info_.*" + SINK_CONSUMER_ID,
91+
"kafka_consumer_kafka_metrics_.*" + SINK_CONSUMER_ID,
92+
"kafka_consumer_consumer_metrics_.*" + SINK_CONSUMER_ID,
93+
"kafka_consumer_consumer_node_metrics_.*" + SINK_CONSUMER_ID,
94+
"kafka_consumer_consumer_coordinator_metrics_.*" + SINK_CONSUMER_ID,
95+
"kafka_consumer_consumer_fetch_manager_metrics_.*" + SINK_CONSUMER_ID);
96+
97+
private static final List<String> SOURCE_PATTERNS = List.of(
98+
"kafka_connect_connector_metrics_.*" + SOURCE_CONNECTOR_PATTERN,
99+
"kafka_connect_connector_task_metrics_.*" + SOURCE_CONNECTOR_PATTERN,
100+
"kafka_connect_source_task_metrics_.*" + SOURCE_CONNECTOR_PATTERN,
101+
"kafka_connect_task_error_metrics_.*" + SOURCE_CONNECTOR_PATTERN,
102+
"kafka_connect_connect_worker_metrics_connector_count 2.0",
103+
"kafka_producer_app_info_.*" + SOURCE_PRODUCER_ID,
104+
"kafka_producer_kafka_metrics_.*" + SOURCE_PRODUCER_ID,
105+
"kafka_producer_producer_metrics_.*" + SOURCE_PRODUCER_ID,
106+
"kafka_producer_producer_node_metrics_.*" + SOURCE_PRODUCER_ID,
107+
"kafka_producer_producer_topic_metrics_.*" + SOURCE_PRODUCER_ID);
108+
109+
private StrimziKafkaCluster kafka;
110+
private StrimziConnectCluster connect;
111+
112+
@BeforeEach
113+
public void setUp() {
114+
kafka = new StrimziKafkaCluster.StrimziKafkaClusterBuilder()
115+
.withNumberOfBrokers(1)
116+
.withSharedNetwork()
117+
.build();
118+
kafka.start();
119+
120+
try (Admin admin = Admin.create(Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()))) {
121+
admin.createTopics(List.of(new NewTopic(TOPIC, 1, (short) -1)));
122+
}
123+
try (KafkaProducer<String, String> producer = new KafkaProducer<>(Map.of(
124+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
125+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName(),
126+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()
127+
))) {
128+
for (int i = 0; i < 5; i++) {
129+
producer.send(new ProducerRecord<>(TOPIC, "record" + i));
130+
}
131+
}
132+
}
133+
134+
@AfterEach
135+
public void tearDown() {
136+
if (connect != null) {
137+
connect.stop();
138+
}
139+
if (kafka != null) {
140+
kafka.stop();
141+
}
142+
}
143+
144+
private void setupConnect(Map<String, String> overrides) {
145+
Map<String, String> configs = new HashMap<>(overrides);
146+
configs.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName());
147+
connect = new StrimziConnectCluster.StrimziConnectClusterBuilder()
148+
.withGroupId(GROUP_ID)
149+
.withKafkaCluster(kafka)
150+
.withAdditionalConnectConfiguration(configs)
151+
.build();
152+
for (GenericContainer<?> worker : connect.getWorkers()) {
153+
worker.withCopyFileToContainer(MountableFile.forHostPath(MetricsUtils.REPORTER_JARS), MetricsUtils.MOUNT_PATH)
154+
.withExposedPorts(8083, PORT)
155+
.withEnv(Map.of("CLASSPATH", MetricsUtils.MOUNT_PATH + "*"));
156+
}
157+
connect.start();
158+
}
159+
160+
@Test
161+
public void testConnectMetrics() throws Exception {
162+
setupConnect(Map.of(
163+
"consumer.metric.reporters", ClientMetricsReporter.class.getName(),
164+
"producer.metric.reporters", ClientMetricsReporter.class.getName(),
165+
"admin.metric.reporters", ClientMetricsReporter.class.getName()
166+
));
167+
168+
// Check the global Connect metrics
169+
checkMetricsExist(CONNECT_PATTERNS);
170+
171+
// Start a sink connector metrics and check its metrics
172+
String connectorConfig =
173+
"{\n" +
174+
" \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSinkConnector\",\n" +
175+
" \"topics\": \"" + TOPIC + "\",\n" +
176+
" \"file\": \"" + FILE + "\"\n" +
177+
"}";
178+
httpPut("/connectors/" + SINK_CONNECTOR + "/config", connectorConfig);
179+
checkMetricsExist(SINK_PATTERNS);
180+
181+
// Start a source connector metrics and check its metrics
182+
connectorConfig =
183+
"{\n" +
184+
" \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSourceConnector\",\n" +
185+
" \"topic\": \"" + TOPIC + "\",\n" +
186+
" \"file\": \"" + FILE + "\"\n" +
187+
"}";
188+
httpPut("/connectors/" + SOURCE_CONNECTOR + "/config", connectorConfig);
189+
checkMetricsExist(SOURCE_PATTERNS);
190+
}
191+
192+
@Test
193+
public void testConnectMetricsWithAllowlist() throws Exception {
194+
setupConnect(Map.of(
195+
"consumer.metric.reporters", ClientMetricsReporter.class.getName(),
196+
"producer.metric.reporters", ClientMetricsReporter.class.getName(),
197+
"admin.metric.reporters", ClientMetricsReporter.class.getName(),
198+
ALLOWLIST_CONFIG, "kafka_connect_connect_worker_.*,kafka_connect_connector_metrics_.*,kafka_admin_client_admin_client_metrics_.*,kafka_consumer_kafka_metrics_.*,kafka_producer_producer_node_metrics_.*",
199+
"admin." + ALLOWLIST_CONFIG, "kafka_admin_client_admin_client_metrics.*",
200+
"consumer." + ALLOWLIST_CONFIG, "kafka_consumer_app_info_.*",
201+
"producer." + ALLOWLIST_CONFIG, "kafka_producer_producer_metrics_.*"
202+
));
203+
204+
// Check the Connect metrics
205+
List<String> allowedPatterns = List.of(
206+
"jvm_.*",
207+
"process_.*",
208+
"kafka_admin_client_admin_client_metrics_.*" + ADMIN_ID_PATTERN,
209+
"kafka_consumer_kafka_metrics_.*" + CLIENT_IDS_PATTERN,
210+
"kafka_producer_producer_node_metrics_.*" + CLIENT_IDS_PATTERN,
211+
"kafka_connect_connect_worker_metrics_.*" + CONNECT_ID_PATTERN,
212+
"kafka_connect_connect_worker_rebalance_metrics_.*" + CONNECT_ID_PATTERN);
213+
checkMetricsExist(allowedPatterns);
214+
215+
List<String> disallowedPatterns = new ArrayList<>(CONNECT_PATTERNS);
216+
disallowedPatterns.removeAll(allowedPatterns);
217+
checkMetricsDontExist(disallowedPatterns);
218+
219+
// Start a sink connector metrics and check its metrics
220+
String connectorConfig =
221+
"{\n" +
222+
" \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSinkConnector\",\n" +
223+
" \"topics\": \"" + TOPIC + "\",\n" +
224+
" \"file\": \"" + FILE + "\"\n" +
225+
"}";
226+
httpPut("/connectors/" + SINK_CONNECTOR + "/config", connectorConfig);
227+
List<String> allowedSinkPatterns = List.of(
228+
"kafka_connect_connector_metrics_.*" + SINK_CONNECTOR_PATTERN,
229+
"kafka_connect_connect_worker_metrics_connector_count 1.0",
230+
"kafka_consumer_app_info_.*" + SINK_CONSUMER_ID);
231+
checkMetricsExist(allowedSinkPatterns);
232+
233+
disallowedPatterns = new ArrayList<>(SINK_PATTERNS);
234+
disallowedPatterns.removeAll(allowedSinkPatterns);
235+
checkMetricsDontExist(disallowedPatterns);
236+
237+
// Start a source connector metrics and check its metrics
238+
connectorConfig =
239+
"{\n" +
240+
" \"connector.class\":\"org.apache.kafka.connect.file.FileStreamSourceConnector\",\n" +
241+
" \"topic\": \"" + TOPIC + "\",\n" +
242+
" \"file\": \"" + FILE + "\"\n" +
243+
"}";
244+
httpPut("/connectors/" + SOURCE_CONNECTOR + "/config", connectorConfig);
245+
List<String> allowedSourcePatterns = List.of(
246+
"kafka_connect_connector_metrics_.*" + SOURCE_CONNECTOR_PATTERN,
247+
"kafka_connect_connect_worker_metrics_connector_count 2.0",
248+
"kafka_producer_producer_metrics_.*" + SOURCE_PRODUCER_ID);
249+
checkMetricsExist(allowedSourcePatterns);
250+
251+
disallowedPatterns = new ArrayList<>(allowedSourcePatterns);
252+
disallowedPatterns.removeAll(allowedSourcePatterns);
253+
checkMetricsDontExist(disallowedPatterns);
254+
}
255+
256+
private void checkMetricsExist(List<String> patterns) {
257+
for (GenericContainer<?> worker : connect.getWorkers()) {
258+
MetricsUtils.verify(worker, patterns, PORT, metrics -> assertFalse(metrics.isEmpty()));
259+
}
260+
}
261+
262+
private void checkMetricsDontExist(List<String> patterns) {
263+
for (GenericContainer<?> worker : connect.getWorkers()) {
264+
MetricsUtils.verify(worker, patterns, PORT, metrics -> assertTrue(metrics.isEmpty()));
265+
}
266+
}
267+
268+
private void httpPut(String path, String body) throws Exception {
269+
HttpClient httpClient = HttpClient.newHttpClient();
270+
URI uri = new URI(connect.getRestEndpoint() + path);
271+
HttpRequest request = HttpRequest.newBuilder()
272+
.PUT(HttpRequest.BodyPublishers.ofString(body))
273+
.setHeader("Content-Type", "application/json")
274+
.uri(uri)
275+
.build();
276+
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
277+
assertEquals(HttpURLConnection.HTTP_CREATED, response.statusCode());
278+
}
279+
}

0 commit comments

Comments
 (0)