Skip to content

Commit f04da36

Browse files
committed
Support backoff on refresh failure
Retry strategies can optionally return a backoff even when token refresh fails. Add support for this in the sync and async retry stages. In RetryableStage and AsyncRetryableStage, check the value of TokenAcquisitionFailedException#delay() backoff if given.
1 parent 33a6bce commit f04da36

6 files changed

Lines changed: 406 additions & 19 deletions

File tree

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncRetryableStage.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Optional;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.ScheduledExecutorService;
25+
import java.util.concurrent.TimeUnit;
2526
import software.amazon.awssdk.annotations.SdkInternalApi;
2627
import software.amazon.awssdk.core.Response;
2728
import software.amazon.awssdk.core.async.AsyncRequestBody;
@@ -34,6 +35,7 @@
3435
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
3536
import software.amazon.awssdk.http.SdkHttpFullRequest;
3637
import software.amazon.awssdk.utils.CompletableFutureUtils;
38+
import software.amazon.awssdk.utils.Either;
3739

3840
/**
3941
* Wrapper around the pipeline for a single request to provide retry, clockskew and request throttling functionality.
@@ -134,9 +136,20 @@ private void attemptExecute(CompletableFuture<Response<OutputT>> future) {
134136
}
135137

136138
public void maybeAttemptExecute(CompletableFuture<Response<OutputT>> future) {
137-
Optional<Duration> delay = retryableStageHelper.tryRefreshToken(Duration.ZERO);
138-
if (!delay.isPresent()) {
139-
future.completeExceptionally(retryableStageHelper.retryPolicyDisallowedRetryException());
139+
Either<Duration, Duration> backoffDelay = retryableStageHelper.tryRefreshToken(Duration.ZERO);
140+
141+
Optional<Duration> acquireFailureDelay = backoffDelay.right();
142+
if (acquireFailureDelay.isPresent()) {
143+
Duration delay = acquireFailureDelay.get();
144+
retryableStageHelper.logAcquireFailureBackingOff(delay);
145+
SdkException disallowedException = retryableStageHelper.retryPolicyDisallowedRetryException();
146+
// Avoid needless scheduling if we won't wait
147+
if (delay.isZero()) {
148+
future.completeExceptionally(disallowedException);
149+
} else {
150+
scheduledExecutor.schedule(() -> future.completeExceptionally(disallowedException),
151+
delay.toMillis(), MILLISECONDS);
152+
}
140153
return;
141154
}
142155
// We failed the last attempt, but will retry. The response handler wants to know when that happens.
@@ -145,9 +158,10 @@ public void maybeAttemptExecute(CompletableFuture<Response<OutputT>> future) {
145158
// Reset the request provider to the original one before retries, in case it was modified downstream.
146159
context.requestProvider(originalRequestBody);
147160

148-
Duration backoffDelay = delay.get();
149-
retryableStageHelper.logBackingOff(backoffDelay);
150-
long totalDelayMillis = backoffDelay.toMillis();
161+
// get() is safe, Either requires left OR right to be present
162+
Duration successDelay = backoffDelay.left().get();
163+
retryableStageHelper.logBackingOff(successDelay);
164+
long totalDelayMillis = successDelay.toMillis();
151165
scheduledExecutor.schedule(() -> attemptExecute(future), totalDelayMillis, MILLISECONDS);
152166
}
153167

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/RetryableStage.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
3030
import software.amazon.awssdk.http.SdkHttpFullRequest;
3131
import software.amazon.awssdk.http.SdkHttpFullResponse;
32+
import software.amazon.awssdk.utils.Either;
3233

3334
/**
3435
* Wrapper around the pipeline for a single request to provide retry, clock-skew and request throttling functionality.
@@ -64,12 +65,19 @@ public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionCon
6465
}
6566
retryableStageHelper.setLastException(throwable);
6667
Duration suggestedDelay = suggestedDelay(e);
67-
Optional<Duration> backoffDelay = retryableStageHelper.tryRefreshToken(suggestedDelay);
68-
if (backoffDelay.isPresent()) {
69-
Duration delay = backoffDelay.get();
68+
Either<Duration, Duration> backoffDelay = retryableStageHelper.tryRefreshToken(suggestedDelay);
69+
Optional<Duration> successDelay = backoffDelay.left();
70+
if (successDelay.isPresent()) {
71+
Duration delay = successDelay.get();
7072
retryableStageHelper.logBackingOff(delay);
7173
TimeUnit.MILLISECONDS.sleep(delay.toMillis());
7274
} else {
75+
Optional<Duration> failureDelay = backoffDelay.right();
76+
if (failureDelay.isPresent()) {
77+
Duration delay = failureDelay.get();
78+
retryableStageHelper.logAcquireFailureBackingOff(delay);
79+
TimeUnit.MILLISECONDS.sleep(delay.toMillis());
80+
}
7381
throw retryableStageHelper.retryPolicyDisallowedRetryException();
7482
}
7583
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/utils/RetryableStageHelper.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import software.amazon.awssdk.retries.api.RetryStrategy;
5050
import software.amazon.awssdk.retries.api.RetryToken;
5151
import software.amazon.awssdk.retries.api.TokenAcquisitionFailedException;
52+
import software.amazon.awssdk.utils.Either;
5253

5354
/**
5455
* Contains the logic shared by {@link RetryableStage} and {@link AsyncRetryableStage} when querying and interacting with a
@@ -126,16 +127,17 @@ public void recordAttemptSucceeded() {
126127
}
127128

128129
/**
129-
* Invoked after a failed attempt and before retrying. The returned optional will be non-empty if the client can retry or
130-
* empty if the retry-strategy disallows the retry. The calling code is expected to wait the delay represented in the duration
131-
* if present before retrying the request.
130+
* Invoked after a failed attempt and before retrying. The returned {@link Either} will have its <b>left</b> be populated
131+
* if the refresh is successful. The <b>right</b> is populated if the refresh is unsuccessful. In either case, the calling
132+
* code is expected to wait the delay represented in the duration before retrying the request or exiting the retry loop.
132133
*
133134
* @param suggestedDelay A suggested delay, presumably coming from the server response. The response when present will be at
134135
* least this amount.
135-
* @return An optional time to wait. If the value is not present the retry strategy disallowed the retry and the calling code
136-
* should not retry.
136+
* @return An optional time to wait, regardless of whether the refresh is successful. If the left value is present, the
137+
* retry strategy allowed the retry. If the right value is present the retry strategy disallowed the retry and the calling
138+
* code should not retry.
137139
*/
138-
public Optional<Duration> tryRefreshToken(Duration suggestedDelay) {
140+
public Either<Duration, Duration> tryRefreshToken(Duration suggestedDelay) {
139141
RetryToken retryToken = context.executionAttributes().getAttribute(RETRY_TOKEN);
140142
RefreshRetryTokenResponse refreshResponse;
141143
try {
@@ -148,12 +150,18 @@ public Optional<Duration> tryRefreshToken(Duration suggestedDelay) {
148150
refreshResponse = retryStrategy().refreshRetryToken(refreshRequest);
149151
} catch (TokenAcquisitionFailedException e) {
150152
context.executionAttributes().putAttribute(RETRY_TOKEN, e.token());
151-
return Optional.empty();
153+
Optional<Duration> acquireFailureDelay = e.delay();
154+
if (acquireFailureDelay.isPresent()) {
155+
Duration acquireDelay = acquireFailureDelay.get();
156+
context.executionAttributes().putAttribute(LAST_BACKOFF_DELAY_DURATION, acquireDelay);
157+
return Either.right(acquireDelay);
158+
}
159+
return Either.right(Duration.ZERO);
152160
}
153-
Duration delay = refreshResponse.delay();
161+
Duration acquireSuccessDelay = refreshResponse.delay();
154162
context.executionAttributes().putAttribute(RETRY_TOKEN, refreshResponse.token());
155-
context.executionAttributes().putAttribute(LAST_BACKOFF_DELAY_DURATION, delay);
156-
return Optional.of(delay);
163+
context.executionAttributes().putAttribute(LAST_BACKOFF_DELAY_DURATION, acquireSuccessDelay);
164+
return Either.left(acquireSuccessDelay);
157165
}
158166

159167
/**
@@ -185,6 +193,12 @@ public void logBackingOff(Duration backoffDelay) {
185193
attemptNumber, lastException);
186194
}
187195

196+
public void logAcquireFailureBackingOff(Duration acquireFailureBackoffDelay) {
197+
SdkStandardLogger.REQUEST_LOGGER.debug(() -> "Unable to acquire sufficient retry quota to retry. Will cease retrying in "
198+
+ acquireFailureBackoffDelay.toMillis() + "ms. Request attempt number " +
199+
attemptNumber, lastException);
200+
}
201+
188202
/**
189203
* Retrieve the request to send to the service, including any detailed retry information headers.
190204
*/
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.http.pipeline.stages;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.when;
23+
24+
import java.net.URI;
25+
import java.time.Duration;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.ScheduledExecutorService;
29+
import java.util.stream.Stream;
30+
import org.junit.jupiter.api.AfterAll;
31+
import org.junit.jupiter.api.BeforeAll;
32+
import org.junit.jupiter.api.BeforeEach;
33+
import org.junit.jupiter.params.ParameterizedTest;
34+
import org.junit.jupiter.params.provider.MethodSource;
35+
import software.amazon.awssdk.core.Response;
36+
import software.amazon.awssdk.core.SdkRequest;
37+
import software.amazon.awssdk.core.SdkResponse;
38+
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
39+
import software.amazon.awssdk.core.client.config.SdkClientOption;
40+
import software.amazon.awssdk.core.exception.SdkException;
41+
import software.amazon.awssdk.core.http.ExecutionContext;
42+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
43+
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
44+
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
45+
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
46+
import software.amazon.awssdk.http.SdkHttpFullRequest;
47+
import software.amazon.awssdk.http.SdkHttpFullResponse;
48+
import software.amazon.awssdk.http.SdkHttpMethod;
49+
import software.amazon.awssdk.metrics.NoOpMetricCollector;
50+
import software.amazon.awssdk.retries.api.AcquireInitialTokenResponse;
51+
import software.amazon.awssdk.retries.api.RetryStrategy;
52+
import software.amazon.awssdk.retries.api.RetryToken;
53+
import software.amazon.awssdk.retries.api.TokenAcquisitionFailedException;
54+
55+
public class AsyncRetryableStageTest {
56+
private RetryStrategy mockRetryStrategy;
57+
private AcquireInitialTokenResponse mockAcquireInitialTokenResponse;
58+
private RetryToken mockRetryToken;
59+
60+
private RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<SdkResponse>>> mockDelegatePipeline;
61+
62+
private static ScheduledExecutorService executorService;
63+
64+
@BeforeAll
65+
static void setup() {
66+
executorService = Executors.newScheduledThreadPool(1);
67+
}
68+
69+
@AfterAll
70+
static void teardown() {
71+
executorService.shutdownNow();
72+
}
73+
74+
@BeforeEach
75+
void methodSetup() {
76+
mockRetryStrategy = mock(RetryStrategy.class);
77+
mockAcquireInitialTokenResponse = mock(AcquireInitialTokenResponse.class);
78+
mockRetryToken = mock(RetryToken.class);
79+
80+
when(mockAcquireInitialTokenResponse.token()).thenReturn(mockRetryToken);
81+
when(mockAcquireInitialTokenResponse.delay()).thenReturn(Duration.ZERO);
82+
83+
when(mockRetryStrategy.acquireInitialToken(any())).thenReturn(mockAcquireInitialTokenResponse);
84+
85+
mockDelegatePipeline = mock(RequestPipeline.class);
86+
}
87+
88+
@ParameterizedTest(name = "Retry disallowed, delays {0}")
89+
@MethodSource("acquireFailureTestCases")
90+
void execute_acquireFailure_backoffBehavesCorrectly(Duration acquireFailureBackoff) throws Exception {
91+
SdkClientConfiguration clientConfig = SdkClientConfiguration.builder()
92+
.option(SdkClientOption.RETRY_STRATEGY, mockRetryStrategy)
93+
.option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE,
94+
executorService)
95+
.build();
96+
97+
HttpClientDependencies deps = HttpClientDependencies.builder()
98+
.clientConfiguration(clientConfig)
99+
.build();
100+
101+
AsyncRetryableStage<SdkResponse> retryableStage = new AsyncRetryableStage<>(null, deps, mockDelegatePipeline);
102+
103+
SdkHttpFullRequest httpRequest = SdkHttpFullRequest.builder()
104+
.method(SdkHttpMethod.GET)
105+
.uri(URI.create("https://my-service.amazonaws.com"))
106+
.build();
107+
108+
ExecutionAttributes execAttrs = ExecutionAttributes.builder()
109+
.build();
110+
111+
ExecutionContext execCtx = ExecutionContext.builder()
112+
.metricCollector(NoOpMetricCollector.create())
113+
.executionAttributes(execAttrs)
114+
.build();
115+
116+
RequestExecutionContext ctx = RequestExecutionContext.builder()
117+
.originalRequest(mock(SdkRequest.class))
118+
.executionContext(execCtx)
119+
.build();
120+
121+
SdkHttpFullResponse.Builder httpResponse = SdkHttpFullResponse.builder()
122+
.statusCode(502);
123+
124+
Response<SdkResponse> response = Response.<SdkResponse>builder()
125+
.httpResponse(httpResponse.build())
126+
.isSuccess(false)
127+
.exception(SdkException.builder().build())
128+
.build();
129+
130+
when(mockDelegatePipeline.execute(any(), any())).thenReturn(CompletableFuture.completedFuture(response));
131+
132+
when(mockRetryStrategy.refreshRetryToken(any())).thenThrow(
133+
new TokenAcquisitionFailedException("Acquire failed", mockRetryToken, null, acquireFailureBackoff)
134+
);
135+
136+
long start = System.nanoTime();
137+
CompletableFuture<Response<SdkResponse>> execute = retryableStage.execute(httpRequest, ctx);
138+
// exception thrown doesn't matter, just results in exception because we mock just enough...
139+
assertThatThrownBy(execute::join);
140+
long end = System.nanoTime();
141+
142+
Duration lowerBound = acquireFailureBackoff != null ? acquireFailureBackoff : Duration.ZERO;
143+
assertThat(Duration.ofNanos(end - start)).isBetween(lowerBound, lowerBound.plusMillis(250));
144+
}
145+
146+
private static Stream<Duration> acquireFailureTestCases() {
147+
return Stream.of(
148+
Duration.ZERO,
149+
null,
150+
Duration.ofMillis(100)
151+
);
152+
}
153+
}

0 commit comments

Comments
 (0)