Skip to content

Commit ecf4d41

Browse files
committed
Support backoff on refresh failure
Retry strategies can optionally return a backoff even when token refresh fails. Add support for this in the sync and async retry stages. In RetryableStage and AsyncRetryableStage, check the value of TokenAcquisitionFailedException#delay() backoff if given.
1 parent 33a6bce commit ecf4d41

6 files changed

Lines changed: 486 additions & 19 deletions

File tree

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncRetryableStage.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
3535
import software.amazon.awssdk.http.SdkHttpFullRequest;
3636
import software.amazon.awssdk.utils.CompletableFutureUtils;
37+
import software.amazon.awssdk.utils.Either;
3738

3839
/**
3940
* Wrapper around the pipeline for a single request to provide retry, clockskew and request throttling functionality.
@@ -134,9 +135,20 @@ private void attemptExecute(CompletableFuture<Response<OutputT>> future) {
134135
}
135136

136137
public void maybeAttemptExecute(CompletableFuture<Response<OutputT>> future) {
137-
Optional<Duration> delay = retryableStageHelper.tryRefreshToken(Duration.ZERO);
138-
if (!delay.isPresent()) {
139-
future.completeExceptionally(retryableStageHelper.retryPolicyDisallowedRetryException());
138+
Either<Duration, Duration> backoffDelay = retryableStageHelper.tryRefreshToken(Duration.ZERO);
139+
140+
Optional<Duration> acquireFailureDelay = backoffDelay.right();
141+
if (acquireFailureDelay.isPresent()) {
142+
Duration delay = acquireFailureDelay.get();
143+
retryableStageHelper.logAcquireFailureBackingOff(delay);
144+
SdkException disallowedException = retryableStageHelper.retryPolicyDisallowedRetryException();
145+
// Avoid needless scheduling if we won't wait
146+
if (delay.isZero()) {
147+
future.completeExceptionally(disallowedException);
148+
} else {
149+
scheduledExecutor.schedule(() -> future.completeExceptionally(disallowedException),
150+
delay.toMillis(), MILLISECONDS);
151+
}
140152
return;
141153
}
142154
// We failed the last attempt, but will retry. The response handler wants to know when that happens.
@@ -145,9 +157,10 @@ public void maybeAttemptExecute(CompletableFuture<Response<OutputT>> future) {
145157
// Reset the request provider to the original one before retries, in case it was modified downstream.
146158
context.requestProvider(originalRequestBody);
147159

148-
Duration backoffDelay = delay.get();
149-
retryableStageHelper.logBackingOff(backoffDelay);
150-
long totalDelayMillis = backoffDelay.toMillis();
160+
// get() is safe, Either requires left OR right to be present
161+
Duration successDelay = backoffDelay.left().get();
162+
retryableStageHelper.logBackingOff(successDelay);
163+
long totalDelayMillis = successDelay.toMillis();
151164
scheduledExecutor.schedule(() -> attemptExecute(future), totalDelayMillis, MILLISECONDS);
152165
}
153166

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/RetryableStage.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import software.amazon.awssdk.core.internal.http.pipeline.stages.utils.RetryableStageHelper;
3030
import software.amazon.awssdk.http.SdkHttpFullRequest;
3131
import software.amazon.awssdk.http.SdkHttpFullResponse;
32+
import software.amazon.awssdk.utils.Either;
3233

3334
/**
3435
* Wrapper around the pipeline for a single request to provide retry, clock-skew and request throttling functionality.
@@ -64,12 +65,19 @@ public Response<OutputT> execute(SdkHttpFullRequest request, RequestExecutionCon
6465
}
6566
retryableStageHelper.setLastException(throwable);
6667
Duration suggestedDelay = suggestedDelay(e);
67-
Optional<Duration> backoffDelay = retryableStageHelper.tryRefreshToken(suggestedDelay);
68-
if (backoffDelay.isPresent()) {
69-
Duration delay = backoffDelay.get();
68+
Either<Duration, Duration> backoffDelay = retryableStageHelper.tryRefreshToken(suggestedDelay);
69+
Optional<Duration> successDelay = backoffDelay.left();
70+
if (successDelay.isPresent()) {
71+
Duration delay = successDelay.get();
7072
retryableStageHelper.logBackingOff(delay);
7173
TimeUnit.MILLISECONDS.sleep(delay.toMillis());
7274
} else {
75+
Optional<Duration> failureDelay = backoffDelay.right();
76+
if (failureDelay.isPresent()) {
77+
Duration delay = failureDelay.get();
78+
retryableStageHelper.logAcquireFailureBackingOff(delay);
79+
TimeUnit.MILLISECONDS.sleep(delay.toMillis());
80+
}
7381
throw retryableStageHelper.retryPolicyDisallowedRetryException();
7482
}
7583
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/utils/RetryableStageHelper.java

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import software.amazon.awssdk.retries.api.RetryStrategy;
5050
import software.amazon.awssdk.retries.api.RetryToken;
5151
import software.amazon.awssdk.retries.api.TokenAcquisitionFailedException;
52+
import software.amazon.awssdk.utils.Either;
5253

5354
/**
5455
* Contains the logic shared by {@link RetryableStage} and {@link AsyncRetryableStage} when querying and interacting with a
@@ -126,16 +127,17 @@ public void recordAttemptSucceeded() {
126127
}
127128

128129
/**
129-
* Invoked after a failed attempt and before retrying. The returned optional will be non-empty if the client can retry or
130-
* empty if the retry-strategy disallows the retry. The calling code is expected to wait the delay represented in the duration
131-
* if present before retrying the request.
130+
* Invoked after a failed attempt and before retrying. The returned {@link Either} will have its <b>left</b> be populated
131+
* if the refresh is successful. The <b>right</b> is populated if the refresh is unsuccessful. In either case, the calling
132+
* code is expected to wait the delay represented in the duration before retrying the request or exiting the retry loop.
132133
*
133134
* @param suggestedDelay A suggested delay, presumably coming from the server response. The response when present will be at
134135
* least this amount.
135-
* @return An optional time to wait. If the value is not present the retry strategy disallowed the retry and the calling code
136-
* should not retry.
136+
* @return An optional time to wait, regardless of whether the refresh is successful. If the left value is present, the
137+
* retry strategy allowed the retry. If the right value is present the retry strategy disallowed the retry and the calling
138+
* code should not retry.
137139
*/
138-
public Optional<Duration> tryRefreshToken(Duration suggestedDelay) {
140+
public Either<Duration, Duration> tryRefreshToken(Duration suggestedDelay) {
139141
RetryToken retryToken = context.executionAttributes().getAttribute(RETRY_TOKEN);
140142
RefreshRetryTokenResponse refreshResponse;
141143
try {
@@ -148,12 +150,18 @@ public Optional<Duration> tryRefreshToken(Duration suggestedDelay) {
148150
refreshResponse = retryStrategy().refreshRetryToken(refreshRequest);
149151
} catch (TokenAcquisitionFailedException e) {
150152
context.executionAttributes().putAttribute(RETRY_TOKEN, e.token());
151-
return Optional.empty();
153+
Optional<Duration> acquireFailureDelay = e.delay();
154+
if (acquireFailureDelay.isPresent()) {
155+
Duration acquireDelay = acquireFailureDelay.get();
156+
context.executionAttributes().putAttribute(LAST_BACKOFF_DELAY_DURATION, acquireDelay);
157+
return Either.right(acquireDelay);
158+
}
159+
return Either.right(Duration.ZERO);
152160
}
153-
Duration delay = refreshResponse.delay();
161+
Duration acquireSuccessDelay = refreshResponse.delay();
154162
context.executionAttributes().putAttribute(RETRY_TOKEN, refreshResponse.token());
155-
context.executionAttributes().putAttribute(LAST_BACKOFF_DELAY_DURATION, delay);
156-
return Optional.of(delay);
163+
context.executionAttributes().putAttribute(LAST_BACKOFF_DELAY_DURATION, acquireSuccessDelay);
164+
return Either.left(acquireSuccessDelay);
157165
}
158166

159167
/**
@@ -185,6 +193,12 @@ public void logBackingOff(Duration backoffDelay) {
185193
attemptNumber, lastException);
186194
}
187195

196+
public void logAcquireFailureBackingOff(Duration acquireFailureBackoffDelay) {
197+
SdkStandardLogger.REQUEST_LOGGER.debug(() -> "Unable to acquire sufficient retry quota to retry. Will cease retrying in "
198+
+ acquireFailureBackoffDelay.toMillis() + "ms. Request attempt number " +
199+
attemptNumber, lastException);
200+
}
201+
188202
/**
189203
* Retrieve the request to send to the service, including any detailed retry information headers.
190204
*/
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.http.pipeline.stages;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.when;
23+
24+
import java.net.URI;
25+
import java.time.Duration;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.ScheduledExecutorService;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.stream.Stream;
31+
import org.junit.jupiter.api.AfterAll;
32+
import org.junit.jupiter.api.BeforeAll;
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.params.ParameterizedTest;
35+
import org.junit.jupiter.params.provider.MethodSource;
36+
import software.amazon.awssdk.core.Response;
37+
import software.amazon.awssdk.core.SdkRequest;
38+
import software.amazon.awssdk.core.SdkResponse;
39+
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
40+
import software.amazon.awssdk.core.client.config.SdkClientOption;
41+
import software.amazon.awssdk.core.exception.SdkException;
42+
import software.amazon.awssdk.core.http.ExecutionContext;
43+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
44+
import software.amazon.awssdk.core.internal.http.HttpClientDependencies;
45+
import software.amazon.awssdk.core.internal.http.RequestExecutionContext;
46+
import software.amazon.awssdk.core.internal.http.TransformingAsyncResponseHandler;
47+
import software.amazon.awssdk.core.internal.http.pipeline.RequestPipeline;
48+
import software.amazon.awssdk.http.SdkHttpFullRequest;
49+
import software.amazon.awssdk.http.SdkHttpFullResponse;
50+
import software.amazon.awssdk.http.SdkHttpMethod;
51+
import software.amazon.awssdk.metrics.NoOpMetricCollector;
52+
import software.amazon.awssdk.retries.api.AcquireInitialTokenResponse;
53+
import software.amazon.awssdk.retries.api.RefreshRetryTokenResponse;
54+
import software.amazon.awssdk.retries.api.RetryStrategy;
55+
import software.amazon.awssdk.retries.api.RetryToken;
56+
import software.amazon.awssdk.retries.api.TokenAcquisitionFailedException;
57+
58+
public class AsyncRetryableStageTest {
59+
private RetryStrategy mockRetryStrategy;
60+
private AcquireInitialTokenResponse mockAcquireInitialTokenResponse;
61+
private RetryToken mockRetryToken;
62+
63+
private RequestPipeline<SdkHttpFullRequest, CompletableFuture<Response<SdkResponse>>> mockDelegatePipeline;
64+
65+
private static ScheduledExecutorService executorService;
66+
67+
@BeforeAll
68+
static void setup() {
69+
executorService = Executors.newScheduledThreadPool(1);
70+
}
71+
72+
@AfterAll
73+
static void teardown() {
74+
executorService.shutdownNow();
75+
}
76+
77+
@BeforeEach
78+
void methodSetup() {
79+
mockRetryStrategy = mock(RetryStrategy.class);
80+
mockAcquireInitialTokenResponse = mock(AcquireInitialTokenResponse.class);
81+
mockRetryToken = mock(RetryToken.class);
82+
83+
when(mockAcquireInitialTokenResponse.token()).thenReturn(mockRetryToken);
84+
when(mockAcquireInitialTokenResponse.delay()).thenReturn(Duration.ZERO);
85+
86+
when(mockRetryStrategy.acquireInitialToken(any())).thenReturn(mockAcquireInitialTokenResponse);
87+
88+
mockDelegatePipeline = mock(RequestPipeline.class);
89+
}
90+
91+
@ParameterizedTest
92+
@MethodSource("acquireDelayTestCases")
93+
void execute_acquireDelay_behavesCorrectly(AcquireDelayTestCase testCase) throws Exception {
94+
SdkClientConfiguration clientConfig = SdkClientConfiguration.builder()
95+
.option(SdkClientOption.RETRY_STRATEGY, mockRetryStrategy)
96+
.option(SdkClientOption.SCHEDULED_EXECUTOR_SERVICE,
97+
executorService)
98+
.build();
99+
100+
HttpClientDependencies deps = HttpClientDependencies.builder()
101+
.clientConfiguration(clientConfig)
102+
.build();
103+
104+
AsyncRetryableStage<SdkResponse> retryableStage = new AsyncRetryableStage<>(mock(TransformingAsyncResponseHandler.class),
105+
deps, mockDelegatePipeline);
106+
107+
SdkHttpFullRequest httpRequest = SdkHttpFullRequest.builder()
108+
.method(SdkHttpMethod.GET)
109+
.uri(URI.create("https://my-service.amazonaws.com"))
110+
.build();
111+
112+
ExecutionAttributes execAttrs = ExecutionAttributes.builder()
113+
.build();
114+
115+
ExecutionContext execCtx = ExecutionContext.builder()
116+
.metricCollector(NoOpMetricCollector.create())
117+
.executionAttributes(execAttrs)
118+
.build();
119+
120+
RequestExecutionContext ctx = RequestExecutionContext.builder()
121+
.originalRequest(mock(SdkRequest.class))
122+
.executionContext(execCtx)
123+
.build();
124+
125+
SdkHttpFullResponse.Builder httpResponse = SdkHttpFullResponse.builder()
126+
.statusCode(502);
127+
128+
Response<SdkResponse> response = Response.<SdkResponse>builder()
129+
.httpResponse(httpResponse.build())
130+
.isSuccess(false)
131+
.exception(SdkException.builder().build())
132+
.build();
133+
134+
when(mockDelegatePipeline.execute(any(), any())).thenReturn(CompletableFuture.completedFuture(response));
135+
136+
if (testCase.failure) {
137+
when(mockRetryStrategy.refreshRetryToken(any())).thenThrow(
138+
new TokenAcquisitionFailedException("Acquire failed", mockRetryToken, null, testCase.failureDelay)
139+
);
140+
} else {
141+
// only retry once, otherwise we'll get into an infinite loop
142+
AtomicBoolean first = new AtomicBoolean();
143+
when(mockRetryStrategy.refreshRetryToken(any())).thenAnswer(i -> {
144+
if (first.compareAndSet(false, true)) {
145+
return RefreshRetryTokenResponse.create(mockRetryToken, testCase.successDelay);
146+
}
147+
throw new TokenAcquisitionFailedException("Acquire failed", mockRetryToken, null, Duration.ZERO);
148+
});
149+
}
150+
151+
long start = System.nanoTime();
152+
CompletableFuture<Response<SdkResponse>> execute = retryableStage.execute(httpRequest, ctx);
153+
// exception thrown doesn't matter, just results in exception because we mock just enough...
154+
assertThatThrownBy(execute::join);
155+
long end = System.nanoTime();
156+
157+
Duration lowerBound = testCase.expectedDelay();
158+
assertThat(Duration.ofNanos(end - start)).isBetween(lowerBound, lowerBound.plusMillis(250));
159+
}
160+
161+
private static Stream<AcquireDelayTestCase> acquireDelayTestCases() {
162+
return Stream.of(
163+
new AcquireDelayTestCase(true, Duration.ofDays(1), Duration.ZERO),
164+
new AcquireDelayTestCase(true, Duration.ofDays(1), Duration.ofMillis(100)),
165+
166+
167+
new AcquireDelayTestCase(false, Duration.ZERO, Duration.ofDays(1)),
168+
new AcquireDelayTestCase(false, Duration.ofMillis(100), Duration.ofDays(1))
169+
);
170+
}
171+
172+
private static class AcquireDelayTestCase {
173+
private boolean failure;
174+
private Duration successDelay;
175+
private Duration failureDelay;
176+
177+
public AcquireDelayTestCase(boolean failure, Duration successDelay, Duration failureDelay) {
178+
this.failure = failure;
179+
this.successDelay = successDelay;
180+
this.failureDelay = failureDelay;
181+
}
182+
183+
public Duration expectedDelay() {
184+
if (failure) {
185+
return failureDelay;
186+
}
187+
return successDelay;
188+
}
189+
190+
@Override
191+
public String toString() {
192+
return (failure ? "Failure" : "Success") + " with delay " + expectedDelay();
193+
}
194+
}
195+
}

0 commit comments

Comments
 (0)