Skip to content

Commit ef19540

Browse files
authored
Make Connect integration tests more reliable (#91)
Signed-off-by: Mickael Maison <[email protected]>
1 parent 293ab7c commit ef19540

File tree

2 files changed

+39
-24
lines changed

2 files changed

+39
-24
lines changed

client-metrics-reporter/src/test/java/io/strimzi/kafka/metrics/prometheus/MetricsUtils.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.prometheus.metrics.model.snapshots.InfoSnapshot;
99
import io.prometheus.metrics.model.snapshots.Labels;
1010
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
11+
import io.strimzi.test.container.StrimziConnectCluster;
1112
import org.junit.jupiter.api.function.ThrowingConsumer;
1213
import org.testcontainers.containers.GenericContainer;
1314
import org.testcontainers.containers.Network;
@@ -18,7 +19,11 @@
1819
import java.io.IOException;
1920
import java.io.InputStreamReader;
2021
import java.net.HttpURLConnection;
22+
import java.net.URI;
2123
import java.net.URL;
24+
import java.net.http.HttpClient;
25+
import java.net.http.HttpRequest;
26+
import java.net.http.HttpResponse;
2227
import java.time.Duration;
2328
import java.util.ArrayList;
2429
import java.util.List;
@@ -149,4 +154,32 @@ public static GenericContainer<?> clientContainer(Map<String, String> env, int p
149154
.waitingFor(Wait.forHttp("/metrics").forStatusCode(200));
150155
}
151156

157+
/**
158+
* Start a connector
159+
* @param connect the Connect cluster
160+
* @param name the name of the connector
161+
* @param config the connector configuration
162+
*/
163+
public static void startConnector(StrimziConnectCluster connect, String name, String config) {
164+
assertTimeoutPreemptively(TIMEOUT, () -> {
165+
while (true) {
166+
HttpClient httpClient = HttpClient.newHttpClient();
167+
URI uri = new URI(connect.getRestEndpoint() + "/connectors/" + name + "/config");
168+
HttpRequest request = HttpRequest.newBuilder()
169+
.PUT(HttpRequest.BodyPublishers.ofString(config))
170+
.setHeader("Content-Type", "application/json")
171+
.uri(uri)
172+
.build();
173+
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
174+
try {
175+
assertEquals(HttpURLConnection.HTTP_CREATED, response.statusCode());
176+
break;
177+
} catch (Throwable t) {
178+
assertInstanceOf(AssertionError.class, t);
179+
TimeUnit.MILLISECONDS.sleep(100L);
180+
}
181+
}
182+
});
183+
}
184+
152185
}

client-metrics-reporter/src/test/java/io/strimzi/kafka/metrics/prometheus/integration/TestConnectMetricsIT.java

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,12 @@
2424
import org.testcontainers.containers.GenericContainer;
2525
import org.testcontainers.utility.MountableFile;
2626

27-
import java.net.HttpURLConnection;
28-
import java.net.URI;
29-
import java.net.http.HttpClient;
30-
import java.net.http.HttpRequest;
31-
import java.net.http.HttpResponse;
3227
import java.util.ArrayList;
3328
import java.util.HashMap;
3429
import java.util.List;
3530
import java.util.Map;
3631

3732
import static io.strimzi.kafka.metrics.prometheus.ClientMetricsReporterConfig.ALLOWLIST_CONFIG;
38-
import static org.junit.jupiter.api.Assertions.assertEquals;
3933
import static org.junit.jupiter.api.Assertions.assertFalse;
4034
import static org.junit.jupiter.api.Assertions.assertTrue;
4135

@@ -158,7 +152,7 @@ private void setupConnect(Map<String, String> overrides) {
158152
}
159153

160154
@Test
161-
public void testConnectMetrics() throws Exception {
155+
public void testConnectMetrics() {
162156
setupConnect(Map.of(
163157
"consumer.metric.reporters", ClientMetricsReporter.class.getName(),
164158
"producer.metric.reporters", ClientMetricsReporter.class.getName(),
@@ -175,7 +169,7 @@ public void testConnectMetrics() throws Exception {
175169
" \"topics\": \"" + TOPIC + "\",\n" +
176170
" \"file\": \"" + FILE + "\"\n" +
177171
"}";
178-
httpPut("/connectors/" + SINK_CONNECTOR + "/config", connectorConfig);
172+
MetricsUtils.startConnector(connect, SINK_CONNECTOR, connectorConfig);
179173
checkMetricsExist(SINK_PATTERNS);
180174

181175
// Start a source connector metrics and check its metrics
@@ -185,12 +179,12 @@ public void testConnectMetrics() throws Exception {
185179
" \"topic\": \"" + TOPIC + "\",\n" +
186180
" \"file\": \"" + FILE + "\"\n" +
187181
"}";
188-
httpPut("/connectors/" + SOURCE_CONNECTOR + "/config", connectorConfig);
182+
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, connectorConfig);
189183
checkMetricsExist(SOURCE_PATTERNS);
190184
}
191185

192186
@Test
193-
public void testConnectMetricsWithAllowlist() throws Exception {
187+
public void testConnectMetricsWithAllowlist() {
194188
setupConnect(Map.of(
195189
"consumer.metric.reporters", ClientMetricsReporter.class.getName(),
196190
"producer.metric.reporters", ClientMetricsReporter.class.getName(),
@@ -223,7 +217,7 @@ public void testConnectMetricsWithAllowlist() throws Exception {
223217
" \"topics\": \"" + TOPIC + "\",\n" +
224218
" \"file\": \"" + FILE + "\"\n" +
225219
"}";
226-
httpPut("/connectors/" + SINK_CONNECTOR + "/config", connectorConfig);
220+
MetricsUtils.startConnector(connect, SINK_CONNECTOR, connectorConfig);
227221
List<String> allowedSinkPatterns = List.of(
228222
"kafka_connect_connector_metrics_.*" + SINK_CONNECTOR_PATTERN,
229223
"kafka_connect_connect_worker_metrics_connector_count 1.0",
@@ -241,7 +235,7 @@ public void testConnectMetricsWithAllowlist() throws Exception {
241235
" \"topic\": \"" + TOPIC + "\",\n" +
242236
" \"file\": \"" + FILE + "\"\n" +
243237
"}";
244-
httpPut("/connectors/" + SOURCE_CONNECTOR + "/config", connectorConfig);
238+
MetricsUtils.startConnector(connect, SOURCE_CONNECTOR, connectorConfig);
245239
List<String> allowedSourcePatterns = List.of(
246240
"kafka_connect_connector_metrics_.*" + SOURCE_CONNECTOR_PATTERN,
247241
"kafka_connect_connect_worker_metrics_connector_count 2.0",
@@ -264,16 +258,4 @@ private void checkMetricsDontExist(List<String> patterns) {
264258
MetricsUtils.verify(worker, patterns, PORT, metrics -> assertTrue(metrics.isEmpty()));
265259
}
266260
}
267-
268-
private void httpPut(String path, String body) throws Exception {
269-
HttpClient httpClient = HttpClient.newHttpClient();
270-
URI uri = new URI(connect.getRestEndpoint() + path);
271-
HttpRequest request = HttpRequest.newBuilder()
272-
.PUT(HttpRequest.BodyPublishers.ofString(body))
273-
.setHeader("Content-Type", "application/json")
274-
.uri(uri)
275-
.build();
276-
HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
277-
assertEquals(HttpURLConnection.HTTP_CREATED, response.statusCode());
278-
}
279261
}

0 commit comments

Comments
 (0)