Skip to content

Commit 8df8d31

Browse files
blobstore: onboard retry config in the client and aws (#113)
1 parent 90498d9 commit 8df8d31

File tree

11 files changed

+796
-5
lines changed

11 files changed

+796
-5
lines changed

blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/AwsBlobStore.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;
2727
import com.salesforce.multicloudj.common.exceptions.SubstrateSdkException;
2828
import com.salesforce.multicloudj.common.exceptions.UnknownException;
29+
import com.salesforce.multicloudj.common.retries.RetryConfig;
2930
import lombok.Getter;
3031
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
3132
import software.amazon.awssdk.awscore.exception.AwsServiceException;
@@ -68,6 +69,7 @@
6869
import java.io.OutputStream;
6970
import java.net.URL;
7071
import java.nio.file.Path;
72+
import java.time.Duration;
7173
import java.util.Collection;
7274
import java.util.Comparator;
7375
import java.util.Iterator;
@@ -531,6 +533,20 @@ private static S3Client buildS3Client(Builder builder) {
531533
if(shouldConfigureHttpClient(builder)) {
532534
b.httpClient(generateHttpClient(builder));
533535
}
536+
if (builder.getRetryConfig() != null) {
537+
// Create a temporary transformer instance for retry strategy conversion
538+
AwsTransformer transformer = builder.getTransformerSupplier().get(builder.getBucket());
539+
b.overrideConfiguration(config -> {
540+
config.retryStrategy(transformer.toAwsRetryStrategy(builder.getRetryConfig()));
541+
// Set API call timeouts if provided
542+
if (builder.getRetryConfig().getAttemptTimeout() != null) {
543+
config.apiCallAttemptTimeout(Duration.ofMillis(builder.getRetryConfig().getAttemptTimeout()));
544+
}
545+
if (builder.getRetryConfig().getTotalTimeout() != null) {
546+
config.apiCallTimeout(Duration.ofMillis(builder.getRetryConfig().getTotalTimeout()));
547+
}
548+
});
549+
}
534550

535551
return b.build();
536552
}

blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/AwsTransformer.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,14 @@
2121
import com.salesforce.multicloudj.blob.driver.PresignedUrlRequest;
2222
import com.salesforce.multicloudj.blob.driver.UploadPartResponse;
2323
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;
24-
import com.salesforce.multicloudj.common.exceptions.UnSupportedOperationException;
2524
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
2625
import software.amazon.awssdk.services.s3.model.StorageClass;
2726
import com.salesforce.multicloudj.blob.driver.UploadRequest;
27+
import com.salesforce.multicloudj.common.retries.RetryConfig;
2828
import com.salesforce.multicloudj.common.util.HexUtil;
2929
import software.amazon.awssdk.core.async.AsyncRequestBody;
30+
import software.amazon.awssdk.retries.StandardRetryStrategy;
31+
import software.amazon.awssdk.retries.api.RetryStrategy;
3032
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
3133
import software.amazon.awssdk.services.s3.model.CommonPrefix;
3234
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
@@ -62,6 +64,7 @@
6264

6365
import java.io.InputStream;
6466
import java.nio.file.Paths;
67+
import java.time.Duration;
6568
import java.util.ArrayList;
6669
import java.util.Collection;
6770
import java.util.Comparator;
@@ -476,4 +479,60 @@ public List<BlobIdentifier> toBlobIdentifiers(List<BlobInfo> blobList) {
476479
.map(blob -> new BlobIdentifier(blob.getKey(), null))
477480
.collect(Collectors.toList());
478481
}
482+
483+
/**
484+
* Converts MultiCloudJ RetryConfig to AWS SDK RetryStrategy
485+
*
486+
* @param retryConfig The retry configuration to convert
487+
* @return AWS SDK RetryStrategy
488+
* @throws InvalidArgumentException if retryConfig is null or has invalid values
489+
*/
490+
public RetryStrategy toAwsRetryStrategy(RetryConfig retryConfig) {
491+
if (retryConfig == null) {
492+
throw new InvalidArgumentException("RetryConfig cannot be null");
493+
}
494+
if (retryConfig.getMaxAttempts() != null && retryConfig.getMaxAttempts() <= 0) {
495+
throw new InvalidArgumentException("RetryConfig.maxAttempts must be greater than 0, got: " + retryConfig.getMaxAttempts());
496+
}
497+
498+
StandardRetryStrategy.Builder strategyBuilder = StandardRetryStrategy.builder();
499+
500+
// Only set maxAttempts if provided, otherwise use AWS SDK default
501+
if (retryConfig.getMaxAttempts() != null) {
502+
strategyBuilder.maxAttempts(retryConfig.getMaxAttempts());
503+
}
504+
505+
// If mode is not set, use AWS SDK's default backoff strategy
506+
if (retryConfig.getMode() == null) {
507+
return strategyBuilder.build();
508+
}
509+
510+
// Configure backoff strategy based on mode
511+
if (retryConfig.getMode() == RetryConfig.Mode.EXPONENTIAL) {
512+
if (retryConfig.getInitialDelayMillis() <= 0) {
513+
throw new InvalidArgumentException("RetryConfig.initialDelayMillis must be greater than 0 for EXPONENTIAL mode, got: " + retryConfig.getInitialDelayMillis());
514+
}
515+
if (retryConfig.getMaxDelayMillis() <= 0) {
516+
throw new InvalidArgumentException("RetryConfig.maxDelayMillis must be greater than 0 for EXPONENTIAL mode, got: " + retryConfig.getMaxDelayMillis());
517+
}
518+
strategyBuilder.backoffStrategy(
519+
software.amazon.awssdk.retries.api.BackoffStrategy.exponentialDelay(
520+
Duration.ofMillis(retryConfig.getInitialDelayMillis()),
521+
Duration.ofMillis(retryConfig.getMaxDelayMillis())
522+
)
523+
);
524+
return strategyBuilder.build();
525+
}
526+
527+
// FIXED mode
528+
if (retryConfig.getFixedDelayMillis() <= 0) {
529+
throw new InvalidArgumentException("RetryConfig.fixedDelayMillis must be greater than 0 for FIXED mode, got: " + retryConfig.getFixedDelayMillis());
530+
}
531+
strategyBuilder.backoffStrategy(
532+
software.amazon.awssdk.retries.api.BackoffStrategy.fixedDelay(
533+
Duration.ofMillis(retryConfig.getFixedDelayMillis())
534+
)
535+
);
536+
return strategyBuilder.build();
537+
}
479538
}

blob/blob-aws/src/main/java/com/salesforce/multicloudj/blob/aws/async/AwsAsyncBlobStore.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.salesforce.multicloudj.common.aws.CredentialsProvider;
3636
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;
3737
import com.salesforce.multicloudj.common.exceptions.SubstrateSdkException;
38+
import com.salesforce.multicloudj.common.retries.RetryConfig;
3839
import com.salesforce.multicloudj.sts.model.CredentialsOverrider;
3940
import lombok.Getter;
4041
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
@@ -71,6 +72,7 @@
7172
import java.io.OutputStream;
7273
import java.net.URL;
7374
import java.nio.file.Path;
75+
import java.time.Duration;
7476
import java.util.ArrayList;
7577
import java.util.Collection;
7678
import java.util.Comparator;
@@ -531,6 +533,22 @@ private static void applyCommonConfig(S3AsyncClientBuilder builder, Builder conf
531533
}
532534
builder.multipartConfiguration(configBuilder.build());
533535

536+
// Configure retry strategy if specified
537+
if (config.getRetryConfig() != null) {
538+
// Create a temporary transformer instance for retry strategy conversion
539+
AwsTransformer transformer = config.getTransformerSupplier().get(config.getBucket());
540+
builder.overrideConfiguration(overrideConfig -> {
541+
overrideConfig.retryStrategy(transformer.toAwsRetryStrategy(config.getRetryConfig()));
542+
// Set API call timeouts if provided
543+
if (config.getRetryConfig().getAttemptTimeout() != null) {
544+
overrideConfig.apiCallAttemptTimeout(Duration.ofMillis(config.getRetryConfig().getAttemptTimeout()));
545+
}
546+
if (config.getRetryConfig().getTotalTimeout() != null) {
547+
overrideConfig.apiCallTimeout(Duration.ofMillis(config.getRetryConfig().getTotalTimeout()));
548+
}
549+
});
550+
}
551+
534552
// Configure async configuration if executor service is specified
535553
if (config.getExecutorService() != null) {
536554
builder.asyncConfiguration(ClientAsyncConfiguration.builder()
@@ -573,6 +591,11 @@ private static void applyCommonConfig(S3CrtAsyncClientBuilder builder, Builder c
573591
builder.minimumPartSizeInBytes(config.getPartBufferSize());
574592
}
575593

594+
// Configure retry policy if specified
595+
if (config.getRetryConfig() != null) {
596+
builder.retryConfiguration(retryConfig -> retryConfig.numRetries(config.getRetryConfig().getMaxAttempts() - 1));
597+
}
598+
576599
// Configure executor service if specified
577600
if (config.getExecutorService() != null) {
578601
builder.futureCompletionExecutor(config.getExecutorService());

blob/blob-aws/src/test/java/com/salesforce/multicloudj/blob/aws/AwsBlobStoreTest.java

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.salesforce.multicloudj.blob.driver.UploadResponse;
2222
import com.salesforce.multicloudj.common.exceptions.InvalidArgumentException;
2323
import com.salesforce.multicloudj.common.exceptions.UnknownException;
24+
import com.salesforce.multicloudj.common.retries.RetryConfig;
2425
import com.salesforce.multicloudj.sts.model.CredentialsOverrider;
2526
import com.salesforce.multicloudj.sts.model.CredentialsType;
2627
import com.salesforce.multicloudj.sts.model.StsCredentials;
@@ -35,6 +36,7 @@
3536
import software.amazon.awssdk.awscore.exception.AwsServiceException;
3637
import software.amazon.awssdk.core.ResponseBytes;
3738
import software.amazon.awssdk.core.ResponseInputStream;
39+
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
3840
import software.amazon.awssdk.core.exception.SdkClientException;
3941
import software.amazon.awssdk.core.sync.RequestBody;
4042
import software.amazon.awssdk.core.sync.ResponseTransformer;
@@ -96,6 +98,7 @@
9698
import java.util.List;
9799
import java.util.Map;
98100
import java.util.NoSuchElementException;
101+
import java.util.function.Consumer;
99102
import java.util.stream.Collectors;
100103
import java.util.stream.IntStream;
101104

@@ -107,6 +110,7 @@
107110
import static org.junit.jupiter.api.Assertions.assertThrows;
108111
import static org.junit.jupiter.api.Assertions.assertTrue;
109112
import static org.mockito.ArgumentMatchers.any;
113+
import static org.mockito.Mockito.doAnswer;
110114
import static org.mockito.Mockito.doReturn;
111115
import static org.mockito.Mockito.doThrow;
112116
import static org.mockito.Mockito.mock;
@@ -128,6 +132,17 @@ void setup() {
128132
S3ClientBuilder mockBuilder = mock(S3ClientBuilder.class);
129133
when(mockBuilder.region(any())).thenReturn(mockBuilder);
130134

135+
// Execute the consumer lambda to cover lines 540-549
136+
doAnswer(invocation -> {
137+
Consumer<ClientOverrideConfiguration.Builder> consumer = invocation.getArgument(0);
138+
ClientOverrideConfiguration.Builder configBuilder = mock(ClientOverrideConfiguration.Builder.class);
139+
when(configBuilder.retryStrategy(any(software.amazon.awssdk.retries.api.RetryStrategy.class))).thenReturn(configBuilder);
140+
when(configBuilder.apiCallAttemptTimeout(any(Duration.class))).thenReturn(configBuilder);
141+
when(configBuilder.apiCallTimeout(any(Duration.class))).thenReturn(configBuilder);
142+
consumer.accept(configBuilder);
143+
return mockBuilder;
144+
}).when(mockBuilder).overrideConfiguration(any(Consumer.class));
145+
131146
s3Client = mockStatic(S3Client.class);
132147
s3Client.when(S3Client::builder).thenReturn(mockBuilder);
133148

@@ -950,4 +965,126 @@ private S3Object mockObject(int index) {
950965
when(mockS3.size()).thenReturn((long) index);
951966
return mockS3;
952967
}
968+
969+
@Test
970+
void testBuildS3ClientWithRetryConfig() {
971+
// Test with exponential retry config
972+
RetryConfig exponentialConfig = RetryConfig.builder()
973+
.mode(RetryConfig.Mode.EXPONENTIAL)
974+
.maxAttempts(3)
975+
.initialDelayMillis(100L)
976+
.multiplier(2.0)
977+
.maxDelayMillis(5000L)
978+
.attemptTimeout(5000L)
979+
.totalTimeout(30000L)
980+
.build();
981+
982+
var store = new AwsBlobStore.Builder()
983+
.withTransformerSupplier(transformerSupplier)
984+
.withBucket("bucket-1")
985+
.withRegion("us-east-2")
986+
.withRetryConfig(exponentialConfig)
987+
.build();
988+
989+
assertNotNull(store);
990+
assertEquals("bucket-1", store.getBucket());
991+
}
992+
993+
@Test
994+
void testBuildS3ClientWithFixedRetryConfig() {
995+
// Test with fixed retry config
996+
RetryConfig fixedConfig = RetryConfig.builder()
997+
.mode(RetryConfig.Mode.FIXED)
998+
.maxAttempts(5)
999+
.fixedDelayMillis(1000L)
1000+
.build();
1001+
1002+
var store = new AwsBlobStore.Builder()
1003+
.withTransformerSupplier(transformerSupplier)
1004+
.withBucket("bucket-1")
1005+
.withRegion("us-east-2")
1006+
.withRetryConfig(fixedConfig)
1007+
.build();
1008+
1009+
assertNotNull(store);
1010+
assertEquals("bucket-1", store.getBucket());
1011+
}
1012+
1013+
@Test
1014+
void testBuildS3ClientWithRetryConfigWithNullMaxAttempts() {
1015+
// Test with null maxAttempts (should use AWS default)
1016+
RetryConfig config = RetryConfig.builder()
1017+
.mode(RetryConfig.Mode.EXPONENTIAL)
1018+
.maxAttempts(null)
1019+
.initialDelayMillis(100L)
1020+
.maxDelayMillis(5000L)
1021+
.build();
1022+
1023+
var store = new AwsBlobStore.Builder()
1024+
.withTransformerSupplier(transformerSupplier)
1025+
.withBucket("bucket-1")
1026+
.withRegion("us-east-2")
1027+
.withRetryConfig(config)
1028+
.build();
1029+
1030+
assertNotNull(store);
1031+
assertEquals("bucket-1", store.getBucket());
1032+
}
1033+
1034+
@Test
1035+
void testBuildS3ClientWithRetryConfigWithAttemptTimeout() {
1036+
// Test with attempt timeout only
1037+
RetryConfig config = RetryConfig.builder()
1038+
.mode(RetryConfig.Mode.EXPONENTIAL)
1039+
.maxAttempts(3)
1040+
.initialDelayMillis(100L)
1041+
.maxDelayMillis(5000L)
1042+
.attemptTimeout(5000L)
1043+
.build();
1044+
1045+
var store = new AwsBlobStore.Builder()
1046+
.withTransformerSupplier(transformerSupplier)
1047+
.withBucket("bucket-1")
1048+
.withRegion("us-east-2")
1049+
.withRetryConfig(config)
1050+
.build();
1051+
1052+
assertNotNull(store);
1053+
assertEquals("bucket-1", store.getBucket());
1054+
}
1055+
1056+
@Test
1057+
void testBuildS3ClientWithRetryConfigWithTotalTimeout() {
1058+
// Test with total timeout only
1059+
RetryConfig config = RetryConfig.builder()
1060+
.mode(RetryConfig.Mode.EXPONENTIAL)
1061+
.maxAttempts(3)
1062+
.initialDelayMillis(100L)
1063+
.maxDelayMillis(5000L)
1064+
.totalTimeout(30000L)
1065+
.build();
1066+
1067+
var store = new AwsBlobStore.Builder()
1068+
.withTransformerSupplier(transformerSupplier)
1069+
.withBucket("bucket-1")
1070+
.withRegion("us-east-2")
1071+
.withRetryConfig(config)
1072+
.build();
1073+
1074+
assertNotNull(store);
1075+
assertEquals("bucket-1", store.getBucket());
1076+
}
1077+
1078+
@Test
1079+
void testBuildS3ClientWithoutRetryConfig() {
1080+
// Test without retry config (default behavior)
1081+
var store = new AwsBlobStore.Builder()
1082+
.withTransformerSupplier(transformerSupplier)
1083+
.withBucket("bucket-1")
1084+
.withRegion("us-east-2")
1085+
.build();
1086+
1087+
assertNotNull(store);
1088+
assertEquals("bucket-1", store.getBucket());
1089+
}
9531090
}

0 commit comments

Comments
 (0)