Skip to content

Commit decc9a4

Browse files
committed
Add HTTP timeouts to KafkaAgentClient to prevent KafkaRoller from blocking
KafkaAgentClient.getBrokerState() relies on a java.net.http.HttpClient that had neither a connect timeout nor a request timeout configured. When a broker is alive but stuck on IO (for example because the underlying storage has zero IOPS), TCP connection establishment succeeds but the Kafka Agent never produces an HTTP response. The call therefore blocks the KafkaRoller's single-threaded executor indefinitely, preventing any other broker from being processed and inflating reconciliation time. Add a bounded timeout (30s) on both the HttpClient connectTimeout and the HttpRequest timeout. On timeout the resulting HttpTimeoutException is wrapped as RuntimeException by doGet() and is already handled gracefully by getBrokerState(), which returns BrokerState(-1, null) and lets the roller move on to other brokers. Fixes #12513 Signed-off-by: chon3806 <93464148+chon3806@users.noreply.github.com>
1 parent a2534e7 commit decc9a4

3 files changed

Lines changed: 63 additions & 6 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## 1.1.0
44

5-
* _Nothing here yet, but we will surely develop something new pretty soon_ 😉
5+
* Add HTTP request and connect timeouts to the Kafka Agent client so that a broker stuck on IO can no longer block the rolling update.
66

77
### Major changes, deprecations, and removals
88

cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClient.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.net.http.HttpRequest;
2424
import java.net.http.HttpResponse;
2525
import java.security.GeneralSecurityException;
26+
import java.time.Duration;
2627

2728
/**
2829
* Creates HTTP client and interacts with Kafka Agent's REST endpoint
@@ -34,6 +35,10 @@ public class KafkaAgentClient {
3435
private static final String BROKER_STATE_REST_PATH = "/v1/broker-state/";
3536
private static final int KAFKA_AGENT_HTTPS_PORT = 8443;
3637
private static final char[] KEYSTORE_PASSWORD = "changeit".toCharArray();
38+
// Bounds the connect and full HTTP request lifecycle so that a broker which accepts the TCP connection but
39+
// never produces a response (e.g. alive but stuck on IO) cannot block the KafkaRoller's single-threaded
40+
// executor indefinitely. See https://github.com/strimzi/strimzi-kafka-operator/issues/12513.
41+
/* test */ static final Duration HTTP_REQUEST_TIMEOUT = Duration.ofSeconds(30);
3742
private final String namespace;
3843
private final Reconciliation reconciliation;
3944
private final String cluster;
@@ -92,20 +97,39 @@ private HttpClient createHttpClient() {
9297
SSLContext sslContext = SSLContext.getInstance("TLS");
9398
sslContext.init(keyManagerFactory.getKeyManagers(), trustManagerFactory.getTrustManagers(), null);
9499

95-
return HttpClient.newBuilder()
100+
return httpClientBuilder()
96101
.sslContext(sslContext)
97102
.build();
98103
} catch (GeneralSecurityException | IOException e) {
99104
throw new RuntimeException("Failed to configure HTTP client", e);
100105
}
101106
}
102107

108+
/**
109+
* Returns an {@link HttpClient.Builder} pre-configured with {@link #HTTP_REQUEST_TIMEOUT} as the connect timeout.
110+
* Centralising the timeout wiring here keeps {@link #createHttpClient()} focused on TLS setup and lets tests
111+
* assert that the configured connect timeout is applied without needing a TLS identity.
112+
*/
113+
/* test */ static HttpClient.Builder httpClientBuilder() {
114+
return HttpClient.newBuilder().connectTimeout(HTTP_REQUEST_TIMEOUT);
115+
}
116+
117+
/**
118+
* Builds a {@code GET} {@link HttpRequest} for the given URI with {@link #HTTP_REQUEST_TIMEOUT} applied as the
119+
* full request timeout. Centralising the timeout wiring here lets tests assert the request timeout without
120+
* needing a live HTTP endpoint.
121+
*/
122+
/* test */ static HttpRequest buildRequest(URI uri) {
123+
return HttpRequest.newBuilder()
124+
.uri(uri)
125+
.timeout(HTTP_REQUEST_TIMEOUT)
126+
.GET()
127+
.build();
128+
}
129+
103130
String doGet(URI uri) {
104131
try {
105-
HttpRequest req = HttpRequest.newBuilder()
106-
.uri(uri)
107-
.GET()
108-
.build();
132+
HttpRequest req = buildRequest(uri);
109133

110134
var response = httpClient.send(req, HttpResponse.BodyHandlers.ofString());
111135
if (response.statusCode() != 200) {

cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/KafkaAgentClientTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@
77
import io.strimzi.operator.common.Reconciliation;
88
import org.junit.jupiter.api.Test;
99

10+
import java.net.URI;
11+
import java.net.http.HttpClient;
12+
import java.net.http.HttpRequest;
13+
import java.time.Duration;
14+
1015
import static org.junit.jupiter.api.Assertions.assertEquals;
1116
import static org.junit.jupiter.api.Assertions.assertTrue;
1217
import static org.mockito.ArgumentMatchers.any;
@@ -60,4 +65,32 @@ public void testErrorResponse() {
6065
assertEquals(0, actual.remainingLogsToRecover());
6166
assertEquals(0, actual.remainingSegmentsToRecover());
6267
}
68+
69+
/**
70+
* Regression guard for https://github.com/strimzi/strimzi-kafka-operator/issues/12513: a previous version of
71+
* KafkaAgentClient configured no timeout on the per-request side, which let a broker stuck on IO block the
72+
* KafkaRoller's single-threaded executor indefinitely. Verify that requests built via the package-private
73+
* helper carry the configured timeout.
74+
*/
75+
@Test
76+
public void testBuildRequestAppliesHttpRequestTimeout() {
77+
HttpRequest request = KafkaAgentClient.buildRequest(URI.create("https://example.invalid/v1/broker-state/"));
78+
Duration timeout = request.timeout().orElseThrow(() -> new AssertionError("HTTP request timeout was not configured"));
79+
assertEquals(KafkaAgentClient.HTTP_REQUEST_TIMEOUT, timeout, "Request timeout must equal HTTP_REQUEST_TIMEOUT");
80+
assertTrue(timeout.toMillis() > 0, "HTTP request timeout must be positive but was " + timeout);
81+
}
82+
83+
/**
84+
* Regression guard for https://github.com/strimzi/strimzi-kafka-operator/issues/12513: a previous version of
85+
* KafkaAgentClient configured no connect timeout, which let an unresponsive broker block the KafkaRoller's
86+
* single-threaded executor indefinitely. Verify that clients built via the package-private helper carry the
87+
* configured connect timeout.
88+
*/
89+
@Test
90+
public void testHttpClientBuilderAppliesConnectTimeout() {
91+
HttpClient client = KafkaAgentClient.httpClientBuilder().build();
92+
Duration connectTimeout = client.connectTimeout().orElseThrow(() -> new AssertionError("HTTP connect timeout was not configured"));
93+
assertEquals(KafkaAgentClient.HTTP_REQUEST_TIMEOUT, connectTimeout, "Connect timeout must equal HTTP_REQUEST_TIMEOUT");
94+
assertTrue(connectTimeout.toMillis() > 0, "HTTP connect timeout must be positive but was " + connectTimeout);
95+
}
6396
}

0 commit comments

Comments
 (0)