Skip to content

Commit 80d7700

Browse files
authored
fix: flaky Consul embedded test startup (#19564)
Fixes #19563. Description This PR hardens the Consul-backed embedded tests against startup races where the Consul container has started but the host-mapped Consul API is not yet reliably accepting requests.
1 parent 2f1f0b7 commit 80d7700

4 files changed

Lines changed: 241 additions & 1 deletion

File tree

embedded-tests/src/test/java/org/apache/druid/testing/embedded/consul/ConsulClusterResource.java

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.druid.testing.embedded.consul;
2121

22+
import com.fasterxml.jackson.databind.ObjectMapper;
2223
import org.apache.druid.java.util.common.StringUtils;
2324
import org.apache.druid.java.util.common.logger.Logger;
2425
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -31,7 +32,16 @@
3132
import org.testcontainers.utility.DockerImageName;
3233

3334
import javax.annotation.Nullable;
35+
import javax.net.ssl.KeyManagerFactory;
36+
import javax.net.ssl.SSLContext;
37+
import javax.net.ssl.TrustManagerFactory;
38+
import java.io.FileInputStream;
39+
import java.io.IOException;
3440
import java.net.URI;
41+
import java.net.http.HttpClient;
42+
import java.net.http.HttpRequest;
43+
import java.net.http.HttpResponse;
44+
import java.security.KeyStore;
3545
import java.time.Duration;
3646

3747
/**
@@ -41,9 +51,12 @@
4151
public class ConsulClusterResource extends TestcontainerResource<GenericContainer<?>>
4252
{
4353
private static final Logger log = new Logger(ConsulClusterResource.class);
54+
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
4455
private static final int CONSUL_HTTP_PORT = 8500;
4556
private static final int CONSUL_HTTPS_PORT = 8501;
4657
private static final DockerImageName CONSUL_IMAGE = DockerImageName.parse("hashicorp/consul:1.18");
58+
private static final Duration READINESS_TIMEOUT = Duration.ofSeconds(30);
59+
private static final Duration READINESS_RETRY_DELAY = Duration.ofMillis(500);
4760

4861
private final ConsulSecurityMode securityMode;
4962
private String consulHostForDruid;
@@ -70,6 +83,13 @@ public ConsulClusterResource(ConsulSecurityMode securityMode)
7083
this.securityMode = securityMode;
7184
}
7285

86+
@Override
87+
public void start()
88+
{
89+
super.start();
90+
waitForConsulApi();
91+
}
92+
7393
@Override
7494
protected GenericContainer<?> createContainer()
7595
{
@@ -106,6 +126,111 @@ protected GenericContainer<?> createContainer()
106126
}
107127
}
108128

129+
private void waitForConsulApi()
130+
{
131+
final long deadline = System.nanoTime() + READINESS_TIMEOUT.toNanos();
132+
final HttpClient httpClient;
133+
Exception lastException = null;
134+
135+
try {
136+
httpClient = createHttpClient();
137+
}
138+
catch (Exception e) {
139+
throw new RuntimeException("Failed to create Consul readiness client", e);
140+
}
141+
142+
while (System.nanoTime() < deadline) {
143+
try {
144+
final HttpRequest request = HttpRequest.newBuilder(getHttpUri("/v1/status/leader"))
145+
.timeout(Duration.ofSeconds(5))
146+
.GET()
147+
.build();
148+
final HttpResponse<String> response = httpClient.send(request, HttpResponse.BodyHandlers.ofString());
149+
if (isConsulLeaderReady(response.statusCode(), response.body())) {
150+
log.info("Consul API is ready at [%s].", getHttpUri("/v1/status/leader"));
151+
return;
152+
}
153+
lastException = new RuntimeException(
154+
StringUtils.format(
155+
"Consul leader endpoint returned status[%d] body[%s]",
156+
response.statusCode(),
157+
response.body()
158+
)
159+
);
160+
}
161+
catch (Exception e) {
162+
lastException = e;
163+
}
164+
165+
try {
166+
Thread.sleep(READINESS_RETRY_DELAY.toMillis());
167+
}
168+
catch (InterruptedException e) {
169+
Thread.currentThread().interrupt();
170+
throw new RuntimeException("Interrupted while waiting for Consul API readiness", e);
171+
}
172+
}
173+
174+
throw new RuntimeException(
175+
StringUtils.format("Consul API did not become ready within [%s]", READINESS_TIMEOUT),
176+
lastException
177+
);
178+
}
179+
180+
static boolean isConsulLeaderReady(int statusCode, @Nullable String body)
181+
{
182+
if (statusCode != 200 || body == null || body.trim().isEmpty()) {
183+
return false;
184+
}
185+
186+
try {
187+
final String leader = JSON_MAPPER.readValue(body, String.class);
188+
return leader != null && !leader.trim().isEmpty();
189+
}
190+
catch (IOException e) {
191+
return false;
192+
}
193+
}
194+
195+
private HttpClient createHttpClient() throws Exception
196+
{
197+
if (securityMode == ConsulSecurityMode.PLAIN) {
198+
return HttpClient.newBuilder()
199+
.connectTimeout(Duration.ofSeconds(5))
200+
.build();
201+
}
202+
203+
if (certBundle == null) {
204+
throw new IllegalStateException("Consul TLS certificate bundle is not initialized");
205+
}
206+
207+
final KeyStore trustStore = KeyStore.getInstance("PKCS12");
208+
try (FileInputStream fis = new FileInputStream(certBundle.getTrustStorePath())) {
209+
trustStore.load(fis, getStorePassword().toCharArray());
210+
}
211+
212+
final TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
213+
tmf.init(trustStore);
214+
215+
KeyManagerFactory kmf = null;
216+
if (securityMode == ConsulSecurityMode.MTLS) {
217+
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
218+
try (FileInputStream fis = new FileInputStream(certBundle.getKeyStorePath())) {
219+
keyStore.load(fis, getStorePassword().toCharArray());
220+
}
221+
kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
222+
kmf.init(keyStore, getStorePassword().toCharArray());
223+
}
224+
225+
final SSLContext sslContext = SSLContext.getInstance("TLS");
226+
sslContext.init(kmf == null ? null : kmf.getKeyManagers(), tmf.getTrustManagers(), null);
227+
228+
return HttpClient.newBuilder()
229+
.sslContext(sslContext)
230+
.connectTimeout(Duration.ofSeconds(5))
231+
.build();
232+
}
233+
109234
@Override
110235
public void onStarted(EmbeddedDruidCluster cluster)
111236
{
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.consul;
21+
22+
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Test;
24+
25+
public class ConsulClusterResourceTest
26+
{
27+
@Test
28+
public void testLeaderEndpointWithLeaderIsReady()
29+
{
30+
Assertions.assertTrue(ConsulClusterResource.isConsulLeaderReady(200, "\"127.0.0.1:8300\""));
31+
}
32+
33+
@Test
34+
public void testLeaderEndpointWithoutLeaderIsNotReady()
35+
{
36+
Assertions.assertFalse(ConsulClusterResource.isConsulLeaderReady(200, "\"\""));
37+
}
38+
39+
@Test
40+
public void testBlankBodyIsNotReady()
41+
{
42+
Assertions.assertFalse(ConsulClusterResource.isConsulLeaderReady(200, ""));
43+
Assertions.assertFalse(ConsulClusterResource.isConsulLeaderReady(200, " "));
44+
Assertions.assertFalse(ConsulClusterResource.isConsulLeaderReady(200, null));
45+
}
46+
47+
@Test
48+
public void testNonOkStatusIsNotReady()
49+
{
50+
Assertions.assertFalse(ConsulClusterResource.isConsulLeaderReady(503, "\"127.0.0.1:8300\""));
51+
}
52+
53+
@Test
54+
public void testMalformedBodyIsNotReady()
55+
{
56+
Assertions.assertFalse(ConsulClusterResource.isConsulLeaderReady(200, "127.0.0.1:8300"));
57+
}
58+
}

extensions-contrib/consul-extensions/src/main/java/org/apache/druid/consul/discovery/ConsulDruidNodeAnnouncer.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,22 @@
2020
package org.apache.druid.consul.discovery;
2121

2222
import com.google.common.base.Preconditions;
23+
import com.google.common.base.Throwables;
2324
import com.google.inject.Inject;
2425
import org.apache.druid.concurrent.LifecycleLock;
2526
import org.apache.druid.discovery.DiscoveryDruidNode;
2627
import org.apache.druid.discovery.DruidNodeAnnouncer;
2728
import org.apache.druid.guice.ManageLifecycle;
2829
import org.apache.druid.java.util.common.ISE;
30+
import org.apache.druid.java.util.common.RetryUtils;
2931
import org.apache.druid.java.util.common.concurrent.Execs;
3032
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
3133
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
3234
import org.apache.druid.java.util.common.logger.Logger;
3335
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
3436

3537
import javax.annotation.Nullable;
38+
import java.io.IOException;
3639
import java.util.Map;
3740
import java.util.Set;
3841
import java.util.concurrent.ConcurrentHashMap;
@@ -48,6 +51,7 @@
4851
public class ConsulDruidNodeAnnouncer implements DruidNodeAnnouncer
4952
{
5053
private static final Logger LOGGER = new Logger(ConsulDruidNodeAnnouncer.class);
54+
private static final int MAX_ANNOUNCE_REGISTRATION_TRIES = 3;
5155

5256
private final ConsulApiClient consulApiClient;
5357
private final ConsulDiscoveryConfig config;
@@ -159,7 +163,7 @@ public void announce(DiscoveryDruidNode discoveryDruidNode)
159163
long registerStart = System.nanoTime();
160164

161165
// Register in Consul, then track locally atomically in this block
162-
consulApiClient.registerService(discoveryDruidNode);
166+
registerServiceWithRetry(serviceId, discoveryDruidNode);
163167
announcedNodes.put(serviceId, discoveryDruidNode);
164168

165169
long registerLatency = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - registerStart);
@@ -197,6 +201,32 @@ public void announce(DiscoveryDruidNode discoveryDruidNode)
197201
}
198202
}
199203

204+
private void registerServiceWithRetry(String serviceId, DiscoveryDruidNode discoveryDruidNode) throws Exception
205+
{
206+
RetryUtils.retry(
207+
() -> {
208+
consulApiClient.registerService(discoveryDruidNode);
209+
return null;
210+
},
211+
ConsulDruidNodeAnnouncer::isTransientConsulFailure,
212+
1,
213+
MAX_ANNOUNCE_REGISTRATION_TRIES,
214+
null,
215+
"Registering Consul service [" + serviceId + "] failed"
216+
);
217+
}
218+
219+
private static boolean isTransientConsulFailure(Throwable throwable)
220+
{
221+
for (Throwable cause : Throwables.getCausalChain(throwable)) {
222+
if (cause instanceof IOException) {
223+
return true;
224+
}
225+
}
226+
227+
return false;
228+
}
229+
200230
@Override
201231
public void unannounce(DiscoveryDruidNode discoveryDruidNode)
202232
{

extensions-contrib/consul-extensions/src/test/java/org/apache/druid/consul/discovery/ConsulDruidNodeAnnouncerTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.druid.discovery.DiscoveryDruidNode;
2323
import org.apache.druid.discovery.NodeRole;
2424
import org.apache.druid.server.DruidNode;
25+
import org.apache.http.NoHttpResponseException;
2526
import org.easymock.Capture;
2627
import org.easymock.EasyMock;
2728
import org.joda.time.Duration;
@@ -95,6 +96,32 @@ public void testAnnounce() throws Exception
9596
EasyMock.verify(mockConsulApiClient);
9697
}
9798

99+
@Test
100+
public void testAnnounceRetriesTransientFailure() throws Exception
101+
{
102+
mockConsulApiClient.registerService(EasyMock.eq(testNode));
103+
EasyMock.expectLastCall().andThrow(new NoHttpResponseException("Consul did not respond"));
104+
105+
mockConsulApiClient.registerService(EasyMock.eq(testNode));
106+
EasyMock.expectLastCall().once();
107+
108+
mockConsulApiClient.passTtlCheck(EasyMock.anyString(), EasyMock.anyString());
109+
EasyMock.expectLastCall().anyTimes();
110+
111+
mockConsulApiClient.deregisterService(EasyMock.anyString());
112+
EasyMock.expectLastCall().once();
113+
114+
EasyMock.replay(mockConsulApiClient);
115+
116+
announcer = new ConsulDruidNodeAnnouncer(mockConsulApiClient, config);
117+
announcer.start();
118+
119+
announcer.announce(testNode);
120+
announcer.stop();
121+
122+
EasyMock.verify(mockConsulApiClient);
123+
}
124+
98125
@Test
99126
public void testUnannounce() throws Exception
100127
{

0 commit comments

Comments
 (0)