Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@
import com.salesforce.multicloudj.common.exceptions.UnAuthorizedException;
import com.salesforce.multicloudj.common.exceptions.UnknownException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -55,6 +57,7 @@
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a bit odd - we shouldn't mix sync and async in here. AwsBlobStore in meant for sync operations

Copy link
Copy Markdown
Collaborator Author

@LihaoLiuXs LihaoLiuXs Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move the parallel download option to asyncClient.

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Configuration;
Expand Down Expand Up @@ -84,11 +87,14 @@
import software.amazon.awssdk.services.s3.model.Tag;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.presigner.S3Presigner;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.DownloadFileRequest;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


/** AWS implementation of BlobStore */
@AutoService(AbstractBlobStore.class)
public class AwsBlobStore extends AbstractBlobStore {
private final S3Client s3Client;
private final S3TransferManager s3DownloadTransferMgr;
private final AwsTransformer transformer;

public AwsBlobStore() {
Expand All @@ -98,6 +104,7 @@ public AwsBlobStore() {
public AwsBlobStore(Builder builder, S3Client s3Client) {
super(builder);
this.s3Client = s3Client;
this.s3DownloadTransferMgr = builder.getTransferManager();
this.transformer = builder.getTransformerSupplier().get(bucket);
}

Expand Down Expand Up @@ -203,6 +210,8 @@ protected UploadResponse doUpload(UploadRequest uploadRequest, RequestBody reque
protected DownloadResponse doDownload(
DownloadRequest downloadRequest, OutputStream outputStream) {
GetObjectRequest request = transformer.toRequest(downloadRequest);
// Parallel download uses the transfer manager file API only; OutputStream downloads always
// stream via GetObject (parallelDownload is ignored for this overload).
GetObjectResponse response =
s3Client.getObject(request, ResponseTransformer.toOutputStream(outputStream));
return transformer.toDownloadResponse(downloadRequest, response);
Expand Down Expand Up @@ -235,7 +244,11 @@ protected DownloadResponse doDownload(DownloadRequest downloadRequest, ByteArray
@Override
protected DownloadResponse doDownload(DownloadRequest downloadRequest, File file) {
GetObjectRequest request = transformer.toRequest(downloadRequest);
GetObjectResponse response = s3Client.getObject(request, ResponseTransformer.toFile(file));
Path destinationPath = resolveDownloadDestinationPath(downloadRequest, file.toPath());
GetObjectResponse response =
downloadRequest.isParallelDownload()
? doParallelDownload(request, destinationPath)
: s3Client.getObject(request, ResponseTransformer.toFile(destinationPath));
return transformer.toDownloadResponse(downloadRequest, response);
}

Expand All @@ -249,10 +262,46 @@ protected DownloadResponse doDownload(DownloadRequest downloadRequest, File file
@Override
protected DownloadResponse doDownload(DownloadRequest downloadRequest, Path path) {
GetObjectRequest request = transformer.toRequest(downloadRequest);
GetObjectResponse response = s3Client.getObject(request, ResponseTransformer.toFile(path));
Path destinationPath = resolveDownloadDestinationPath(downloadRequest, path);
GetObjectResponse response =
downloadRequest.isParallelDownload()
? doParallelDownload(request, destinationPath)
: s3Client.getObject(request, ResponseTransformer.toFile(destinationPath));
return transformer.toDownloadResponse(downloadRequest, response);
}

/**
* When createParentPath is enabled, resolves the final file path by appending
* the object key to the destination root and creating any missing parent directories.
*/
private Path resolveDownloadDestinationPath(DownloadRequest request, Path destination) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it;s not resolving, it's creating.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the name.

if (!request.isCreateParentPath()) {
return destination;
}
// When requested, keep the key's parent path structure under the provided destination root.
Path resolved = destination.resolve(request.getKey()).normalize();
Path parent = resolved.getParent();
if (parent != null) {
try {
Files.createDirectories(parent);
} catch (IOException e) {
throw new SubstrateSdkException("Failed to create destination directories", e);
}
}
return resolved;
}

private GetObjectResponse doParallelDownload(GetObjectRequest request, Path destination) {
DownloadFileRequest downloadFileRequest =
DownloadFileRequest.builder().getObjectRequest(request).destination(destination).build();
// Block until the async multi-part download completes, since doDownload is a synchronous API.
return s3DownloadTransferMgr
.downloadFile(downloadFileRequest)
.completionFuture()
.join()
.response();
}

/**
* Performs Blob download and returns an InputStream
*
Expand Down Expand Up @@ -574,6 +623,9 @@ public void updateLegalHold(String key, String versionId, boolean legalHold) {
/** Closes the underlying S3 client and releases any resources. */
@Override
public void close() {
if (s3DownloadTransferMgr != null) {
s3DownloadTransferMgr.close();
}
if (s3Client != null) {
s3Client.close();
}
Expand All @@ -583,6 +635,7 @@ public void close() {
public static class Builder extends AbstractBlobStore.Builder<AwsBlobStore, Builder> {

private S3Client s3Client;
private S3TransferManager transferManager;
private AwsTransformerSupplier transformerSupplier = new AwsTransformerSupplier();

public Builder() {
Expand Down Expand Up @@ -632,6 +685,26 @@ private static S3Client buildS3Client(Builder builder) {
return b.build();
}

/**
* S3TransferManager requires an S3AsyncClient; this is an AWS SDK
* constraint, not an async API choice.
*/
private static S3TransferManager buildTransferManager(Builder builder) {
Region regionObj = Region.of(builder.getRegion());
var asyncBuilder = S3AsyncClient.builder().region(regionObj);

AwsCredentialsProvider credentialsProvider =
CredentialsProvider.getCredentialsProvider(builder.getCredentialsOverrider(), regionObj);
if (credentialsProvider != null) {
asyncBuilder.credentialsProvider(credentialsProvider);
}
if (builder.getEndpoint() != null) {
asyncBuilder.endpointOverride(builder.getEndpoint());
}

return S3TransferManager.builder().s3Client(asyncBuilder.build()).build();
}

/** Helper function to generate the HttpClient */
private static SdkHttpClient generateHttpClient(Builder builder) {
ApacheHttpClient.Builder httpClientBuilder = ApacheHttpClient.builder();
Expand Down Expand Up @@ -673,11 +746,19 @@ public Builder withTransformerSupplier(AwsTransformerSupplier transformerSupplie
return this;
}

public Builder withTransferManager(S3TransferManager transferManager) {
this.transferManager = transferManager;
return this;
}

@Override
public AwsBlobStore build() {
if (s3Client == null) {
s3Client = buildS3Client(this);
}
if (transferManager == null) {
transferManager = buildTransferManager(this);
}

return new AwsBlobStore(this, s3Client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import software.amazon.awssdk.core.checksums.RequestChecksumCalculation;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3Configuration;
import software.amazon.awssdk.transfer.s3.S3TransferManager;

public class AwsBlobStoreIT extends AbstractBlobStoreIT {

Expand Down Expand Up @@ -87,9 +89,26 @@ private AbstractBlobStore createBlobStore(
.build())
.build();

S3AsyncClient asyncClient =
S3AsyncClient.builder()
.region(Region.US_WEST_2)
.httpClientBuilder(TestsUtilAws.getAsyncProxyClientBuilder("https", port))
.credentialsProvider(StaticCredentialsProvider.create(awsCredentials))
.endpointOverride(URI.create(endpoint))
.serviceConfiguration(
S3Configuration.builder()
.pathStyleAccessEnabled(true)
.chunkedEncodingEnabled(false)
.build())
.build();

S3TransferManager transferManager =
S3TransferManager.builder().s3Client(asyncClient).build();

AwsBlobStore.Builder builder = new AwsBlobStore.Builder();
builder
.withS3Client(client)
.withTransferManager(transferManager)
.withEndpoint(URI.create(endpoint))
.withBucket(bucketName)
.withRegion(region)
Expand Down
Loading
Loading