Skip to content

Commit 270bb71

Browse files
Johan Wallessazzad16Jens Green Olanderjensgreen
authored
Retry with backoff on cluster connection failures (#2358)
* Split JedisClusterCommand into multiple methods No behavior changes, just a refactoring. Changes: * Replaces recursion with a for loop * Extract redirection handling into its own method * Extract connection-failed handling into its own method Note that `tryWithRandomNode` is gone, it was never `true` so it and its code didn't survive the refactoring. * Drop redundant null check * Bump JDK version to 1.8 Inspired by #1334 where this went real easy :). Would have made #2355 shorter. Free public updates for JDK 7 ended in 2015: <https://en.wikipedia.org/wiki/Java_version_history> For JDK 8, free public support is available from non-Orace vendors until at least 2026 according to the same table. And JDK 8 is what Jedis is being tested on anyway: <https://github.com/redis/jedis/blob/ac0969315655180c09b8139c16bded09c068d498/.circleci/config.yml#L67-L74> * Replace ConnectionGetters with lambdas * Retrigger CI * Add backoff to Redis connections * Add unit tests for backoff logic * Add retries logging * Always use the user requested timeout * Remedy review feedback * Consider connection exceptions and disregard random nodes * consider connection exceptions and disregard random nodes * reset redirection * Revert "Consider connection exceptions and disregard random nodes" This reverts commit 67a062a. Lots of tests in JedisClusterCommandTests started failing, need to be fixed before trying again. * Add another backoff test case 1. We try to contact master => JedisConnectionException 2. We try to contact replica => It refers us to master, hasn't failed over yet 3. We try to contact master => JedisConnectionException 4. We try to contact replica => Success, because it has now failed over * consider connection exceptions and disregard random nodes * reset redirection * Fix test failure * Apply suggestions from code review Co-authored-by: Jens Green Olander <[email protected]> * update documentation * Improve a comment * Update src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java * Add change from another branch Source (all of these point to the same place): * walles/retries-split * 4f80d73 * #2355 * Move JedisClusterCommandTest out of commands package * Use JedisClusterOperationException * Reduce sleep time, especially when few attempts left * Update src/main/java/redis/clients/jedis/JedisClusterCommand.java * merge fix * merge fix * Use maxAttempts * format import * Re-add missing codes due to merge * avoid NPE while zero max attempts * Remove zero attempts test * More cluster constructors and customizability * Use maxTotalRetriesDuration everywhere * more missing maxTotalRetriesDuration after merge Co-authored-by: M Sazzadul Hoque <[email protected]> Co-authored-by: Jens Green Olander <[email protected]> Co-authored-by: Jens Green Olander <[email protected]>
1 parent 71dac36 commit 270bb71

File tree

9 files changed

+1055
-526
lines changed

9 files changed

+1055
-526
lines changed

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,12 @@
9292
<version>2.3.2</version>
9393
<scope>test</scope>
9494
</dependency>
95+
<dependency>
96+
<groupId>org.mockito</groupId>
97+
<artifactId>mockito-core</artifactId>
98+
<version>3.7.7</version>
99+
<scope>test</scope>
100+
</dependency>
95101
</dependencies>
96102

97103
<distributionManagement>

src/main/java/redis/clients/jedis/BinaryJedis.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,11 @@ public BinaryJedis(final JedisSocketFactory jedisSocketFactory, final JedisClien
292292
initializeFromClientConfig(clientConfig);
293293
}
294294

295+
@Override
296+
public String toString() {
297+
return "BinaryJedis{" + client + '}';
298+
}
299+
295300
public boolean isConnected() {
296301
return client.isConnected();
297302
}

src/main/java/redis/clients/jedis/BinaryJedisCluster.java

Lines changed: 298 additions & 259 deletions
Large diffs are not rendered by default.

src/main/java/redis/clients/jedis/Connection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public Connection(final JedisSocketFactory jedisSocketFactory) {
8181
this.soTimeout = jedisSocketFactory.getSoTimeout();
8282
}
8383

84+
@Override
85+
public String toString() {
86+
return "Connection{" + socketFactory + "}";
87+
}
88+
8489
public Socket getSocket() {
8590
return socket;
8691
}

src/main/java/redis/clients/jedis/DefaultJedisSocketFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,4 +220,9 @@ public HostAndPortMapper getHostAndPortMapper() {
220220
public void setHostAndPortMapper(HostAndPortMapper hostAndPortMapper) {
221221
this.hostAndPortMapper = hostAndPortMapper;
222222
}
223+
224+
@Override
225+
public String toString() {
226+
return "DefaultJedisSocketFactory{" + hostAndPort.toString() + "}";
227+
}
223228
}

src/main/java/redis/clients/jedis/JedisCluster.java

Lines changed: 282 additions & 257 deletions
Large diffs are not rendered by default.

src/main/java/redis/clients/jedis/JedisClusterCommand.java

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package redis.clients.jedis;
22

3+
import java.time.Duration;
4+
import java.time.Instant;
5+
import java.util.concurrent.TimeUnit;
36
import org.slf4j.Logger;
47
import org.slf4j.LoggerFactory;
58

@@ -18,10 +21,22 @@ public abstract class JedisClusterCommand<T> {
1821

1922
private final JedisClusterConnectionHandler connectionHandler;
2023
private final int maxAttempts;
24+
private final Duration maxTotalRetriesDuration;
2125

2226
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts) {
27+
this(connectionHandler, maxAttempts, Duration.ofMillis((long) BinaryJedisCluster.DEFAULT_TIMEOUT * maxAttempts));
28+
}
29+
30+
/**
31+
* @param connectionHandler
32+
* @param maxAttempts
33+
* @param maxTotalRetriesDuration No more attempts after we have been trying for this long.
34+
*/
35+
public JedisClusterCommand(JedisClusterConnectionHandler connectionHandler, int maxAttempts,
36+
Duration maxTotalRetriesDuration) {
2337
this.connectionHandler = connectionHandler;
2438
this.maxAttempts = maxAttempts;
39+
this.maxTotalRetriesDuration = maxTotalRetriesDuration;
2540
}
2641

2742
public abstract T execute(Jedis connection);
@@ -85,7 +100,10 @@ public T runWithAnyNode() {
85100
}
86101

87102
private T runWithRetries(final int slot) {
103+
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);
104+
88105
JedisRedirectionException redirect = null;
106+
int consecutiveConnectionFailures = 0;
89107
Exception lastException = null;
90108
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
91109
Jedis connection = null;
@@ -106,15 +124,21 @@ private T runWithRetries(final int slot) {
106124
throw jnrcne;
107125
} catch (JedisConnectionException jce) {
108126
lastException = jce;
127+
++consecutiveConnectionFailures;
109128
LOG.debug("Failed connecting to Redis: {}", connection, jce);
110129
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
111-
handleConnectionProblem(attemptsLeft - 1);
130+
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
131+
if (reset) {
132+
consecutiveConnectionFailures = 0;
133+
redirect = null;
134+
}
112135
} catch (JedisRedirectionException jre) {
113136
// avoid updating lastException if it is a connection exception
114137
if (lastException == null || lastException instanceof JedisRedirectionException) {
115138
lastException = jre;
116139
}
117140
LOG.debug("Redirected by server to {}", jre.getTargetNode());
141+
consecutiveConnectionFailures = 0;
118142
redirect = jre;
119143
// if MOVED redirection occurred,
120144
if (jre instanceof JedisMovedDataException) {
@@ -124,6 +148,9 @@ private T runWithRetries(final int slot) {
124148
} finally {
125149
releaseConnection(connection);
126150
}
151+
if (Instant.now().isAfter(deadline)) {
152+
throw new JedisClusterOperationException("Cluster retry deadline exceeded.");
153+
}
127154
}
128155

129156
JedisClusterMaxAttemptsException maxAttemptsException
@@ -132,14 +159,60 @@ private T runWithRetries(final int slot) {
132159
throw maxAttemptsException;
133160
}
134161

135-
private void handleConnectionProblem(int attemptsLeft) {
136-
if (attemptsLeft <= 1) {
137-
// We need this because if node is not reachable anymore - we need to finally initiate slots
138-
// renewing, or we can stuck with cluster state without one node in opposite case.
139-
// But now if maxAttempts = [1 or 2] we will do it too often.
140-
// TODO make tracking of successful/unsuccessful operations for node - do renewing only
141-
// if there were no successful responses from this node last few seconds
142-
this.connectionHandler.renewSlotCache();
162+
/**
163+
* Related values should be reset if <code>TRUE</code> is returned.
164+
*
165+
* @param attemptsLeft
166+
* @param consecutiveConnectionFailures
167+
* @param doneDeadline
168+
* @return true - if some actions are taken
169+
* <br /> false - if no actions are taken
170+
*/
171+
private boolean handleConnectionProblem(int attemptsLeft, int consecutiveConnectionFailures, Instant doneDeadline) {
172+
if (this.maxAttempts < 3) {
173+
// Since we only renew the slots cache after two consecutive connection
174+
// failures (see consecutiveConnectionFailures above), we need to special
175+
// case the situation where we max out after two or fewer attempts.
176+
// Otherwise, on two or fewer max attempts, the slots cache would never be
177+
// renewed.
178+
if (attemptsLeft == 0) {
179+
this.connectionHandler.renewSlotCache();
180+
return true;
181+
}
182+
return false;
183+
}
184+
185+
if (consecutiveConnectionFailures < 2) {
186+
return false;
187+
}
188+
189+
sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline));
190+
//We need this because if node is not reachable anymore - we need to finally initiate slots
191+
//renewing, or we can stuck with cluster state without one node in opposite case.
192+
//TODO make tracking of successful/unsuccessful operations for node - do renewing only
193+
//if there were no successful responses from this node last few seconds
194+
this.connectionHandler.renewSlotCache();
195+
return true;
196+
}
197+
198+
private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) {
199+
if (attemptsLeft <= 0) {
200+
return 0;
201+
}
202+
203+
long millisLeft = Duration.between(Instant.now(), deadline).toMillis();
204+
if (millisLeft < 0) {
205+
throw new JedisClusterOperationException("Cluster retry deadline exceeded.");
206+
}
207+
208+
return millisLeft / (attemptsLeft * (attemptsLeft + 1));
209+
}
210+
211+
protected void sleep(long sleepMillis) {
212+
try {
213+
TimeUnit.MILLISECONDS.sleep(sleepMillis);
214+
} catch (InterruptedException e) {
215+
throw new JedisClusterOperationException(e);
143216
}
144217
}
145218

0 commit comments

Comments
 (0)