diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClient.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClient.java index 09d563e706d..20e57d3dc3e 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClient.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClient.java @@ -23,6 +23,7 @@ import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.security.GeneralSecurityException; +import java.time.Duration; /** * Creates HTTP client and interacts with Kafka Agent's REST endpoint @@ -34,6 +35,11 @@ public class KafkaAgentClient { private static final String BROKER_STATE_REST_PATH = "/v1/broker-state/"; private static final int KAFKA_AGENT_HTTPS_PORT = 8443; private static final char[] KEYSTORE_PASSWORD = "changeit".toCharArray(); + // Bounds the connect and full HTTP request lifecycle so that a broker which accepts the TCP connection but + // never produces a response (e.g. alive but stuck on IO) cannot block the KafkaRoller's single-threaded + // executor indefinitely. The Kafka Agent only serves a small broker-state JSON, so 10 seconds is well above + // the expected response time on a healthy broker yet small enough to keep the roller responsive. + /* test */ static final Duration HTTP_REQUEST_TIMEOUT = Duration.ofSeconds(10); private final String namespace; private final Reconciliation reconciliation; private final String cluster; @@ -92,7 +98,7 @@ private HttpClient createHttpClient() { SSLContext sslContext = SSLContext.getInstance("TLS"); sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null); - return HttpClient.newBuilder() + return httpClientBuilder() .sslContext(sslContext) .build(); } catch (GeneralSecurityException | IOException e) { @@ -100,12 +106,37 @@ private HttpClient createHttpClient() { } } + /** + * Returns an {@link HttpClient.Builder} pre-configured with {@link #HTTP_REQUEST_TIMEOUT} as the connect timeout. + * Centralising the timeout wiring here keeps {@link #createHttpClient()} focused on TLS setup and lets tests + * assert that the configured connect timeout is applied without needing a TLS identity. + * + * @return an {@link HttpClient.Builder} with the connect timeout already applied + */ + /* test */ static HttpClient.Builder httpClientBuilder() { + return HttpClient.newBuilder().connectTimeout(HTTP_REQUEST_TIMEOUT); + } + + /** + * Builds a {@code GET} {@link HttpRequest} for the given URI with {@link #HTTP_REQUEST_TIMEOUT} applied as the + * full request timeout. Centralising the timeout wiring here lets tests assert the request timeout without + * needing a live HTTP endpoint. + * + * @param uri the target URI for the request + * + * @return a {@code GET} {@link HttpRequest} for {@code uri} with the request timeout already applied + */ + /* test */ static HttpRequest buildRequest(URI uri) { + return HttpRequest.newBuilder() + .uri(uri) + .timeout(HTTP_REQUEST_TIMEOUT) + .GET() + .build(); + } + String doGet(URI uri) { try { - HttpRequest req = HttpRequest.newBuilder() - .uri(uri) - .GET() - .build(); + HttpRequest req = buildRequest(uri); var response = httpClient.send(req, HttpResponse.BodyHandlers.ofString()); if (response.statusCode() != 200) { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClientTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClientTest.java index 5a5dc86708f..4c889bb2cf5 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClientTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClientTest.java @@ -7,6 +7,11 @@ import io.strimzi.operator.common.Reconciliation; import org.junit.jupiter.api.Test; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.time.Duration; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -60,4 +65,30 @@ public void testErrorResponse() { assertEquals(0, actual.remainingLogsToRecover()); assertEquals(0, actual.remainingSegmentsToRecover()); } + + /** + * Regression guard: a previous version of KafkaAgentClient configured no timeout on the per-request side, + * which let a broker stuck on IO block the KafkaRoller's single-threaded executor indefinitely. Verify that + * requests built via the package-private helper carry the configured timeout. + */ + @Test + public void testBuildRequestAppliesHttpRequestTimeout() { + HttpRequest request = KafkaAgentClient.buildRequest(URI.create("https://example.invalid/v1/broker-state/")); + Duration timeout = request.timeout().orElseThrow(() -> new AssertionError("HTTP request timeout was not configured")); + assertEquals(KafkaAgentClient.HTTP_REQUEST_TIMEOUT, timeout, "Request timeout must equal HTTP_REQUEST_TIMEOUT"); + assertTrue(timeout.toMillis() > 0, "HTTP request timeout must be positive but was " + timeout); + } + + /** + * Regression guard: a previous version of KafkaAgentClient configured no connect timeout, which let an + * unresponsive broker block the KafkaRoller's single-threaded executor indefinitely. Verify that clients + * built via the package-private helper carry the configured connect timeout. + */ + @Test + public void testHttpClientBuilderAppliesConnectTimeout() { + HttpClient client = KafkaAgentClient.httpClientBuilder().build(); + Duration connectTimeout = client.connectTimeout().orElseThrow(() -> new AssertionError("HTTP connect timeout was not configured")); + assertEquals(KafkaAgentClient.HTTP_REQUEST_TIMEOUT, connectTimeout, "Connect timeout must equal HTTP_REQUEST_TIMEOUT"); + assertTrue(connectTimeout.toMillis() > 0, "HTTP connect timeout must be positive but was " + connectTimeout); + } }