Skip to content

Commit d27f158

Browse files
committed
feat(s3): use AWS SDK v2 TransferManager for copy task
1 parent 4aa16d7 commit d27f158

File tree

1 file changed

+104
-58
lines changed
  • src/main/java/io/kestra/plugin/aws/s3

1 file changed

+104
-58
lines changed

src/main/java/io/kestra/plugin/aws/s3/Copy.java

Lines changed: 104 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
package io.kestra.plugin.aws.s3;
2+
23
import io.kestra.core.models.annotations.Example;
34
import io.kestra.core.models.annotations.Plugin;
45
import io.kestra.core.models.annotations.PluginProperty;
@@ -8,23 +9,31 @@
89
import io.kestra.plugin.aws.AbstractConnection;
910
import io.swagger.v3.oas.annotations.media.Schema;
1011
import jakarta.validation.constraints.NotNull;
11-
import lombok.*;
12+
import lombok.Builder;
13+
import lombok.EqualsAndHashCode;
14+
import lombok.Getter;
15+
import lombok.NoArgsConstructor;
16+
import lombok.ToString;
1217
import lombok.experimental.SuperBuilder;
1318
import software.amazon.awssdk.services.s3.S3Client;
1419
import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
1520
import software.amazon.awssdk.services.s3.model.CopyObjectResponse;
21+
import software.amazon.awssdk.transfer.s3.S3TransferManager;
22+
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
23+
import software.amazon.awssdk.transfer.s3.model.CompletedCopy;
1624
import io.kestra.plugin.aws.s3.models.S3ServerSideEncryption;
25+
import software.amazon.awssdk.services.s3.S3AsyncClient;
1726

1827
@SuperBuilder
1928
@ToString
2029
@EqualsAndHashCode
2130
@Getter
2231
@NoArgsConstructor
2332
@Plugin(
24-
examples = {
25-
@Example(
26-
full = true,
27-
code = """
33+
examples = {
34+
@Example(
35+
full = true,
36+
code = """
2837
id: aws_s3_copy
2938
namespace: company.team
3039
@@ -41,114 +50,149 @@
4150
bucket: "my-bucket2"
4251
key: "path/to/file2"
4352
"""
44-
)
45-
}
53+
)
54+
}
4655
)
4756
@Schema(
48-
title = "Copy a file between S3 buckets."
57+
title = "Copy a file between S3 buckets."
4958
)
5059
public class Copy extends AbstractConnection implements AbstractS3, RunnableTask<Copy.Output> {
60+
5161
@Schema(
52-
title = "The source bucket and key."
62+
title = "The source bucket and key."
5363
)
5464
@PluginProperty
5565
private CopyObjectFrom from;
5666

5767
@Schema(
58-
title = "The destination bucket and key."
68+
title = "The destination bucket and key."
5969
)
6070
@PluginProperty
6171
private CopyObject to;
6272

6373
@Schema(
64-
title = "Whether to delete the source file after download."
74+
title = "Whether to delete the source file after download."
6575
)
6676
@Builder.Default
6777
private Property<Boolean> delete = Property.ofValue(false);
6878

6979
@Override
7080
public Output run(RunContext runContext) throws Exception {
71-
try (S3Client client = this.client(runContext)) {
72-
CopyObjectRequest.Builder builder = CopyObjectRequest.builder()
73-
.sourceBucket(runContext.render(this.from.bucket).as(String.class).orElseThrow())
74-
.sourceKey(runContext.render(this.from.key).as(String.class).orElseThrow())
75-
.destinationBucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket).as(String.class).orElseThrow())
76-
.destinationKey(runContext.render(this.to.key != null ? this.to.key : this.from.key).as(String.class).orElseThrow());
7781

82+
try (
83+
S3AsyncClient s3AsyncClient = this.asyncClient(runContext);
84+
S3TransferManager transferManager = S3TransferManager.builder()
85+
.s3Client(s3AsyncClient)
86+
.build()) {
87+
88+
CopyObjectRequest.Builder copyObjectBuilder = CopyObjectRequest.builder()
89+
.sourceBucket(runContext.render(this.from.bucket).as(String.class).orElseThrow())
90+
.sourceKey(runContext.render(this.from.key).as(String.class).orElseThrow())
91+
.destinationBucket(
92+
runContext.render(
93+
this.to.bucket != null ? this.to.bucket : this.from.bucket
94+
).as(String.class).orElseThrow()
95+
)
96+
.destinationKey(
97+
runContext.render(
98+
this.to.key != null ? this.to.key : this.from.key
99+
).as(String.class).orElseThrow()
100+
);
101+
102+
// Optional version ID
78103
if (this.from.versionId != null) {
79-
builder.sourceVersionId(runContext.render(this.from.versionId).as(String.class).orElseThrow());
104+
copyObjectBuilder.sourceVersionId(
105+
runContext.render(this.from.versionId).as(String.class).orElseThrow()
106+
);
80107
}
81108

109+
// Server-side encryption
82110
if (this.to != null && this.to.serverSideEncryption != null) {
83-
S3ServerSideEncryption rSse = runContext
84-
.render(this.to.serverSideEncryption)
85-
.as(S3ServerSideEncryption.class)
86-
.orElse(null);
87-
88-
if (rSse != null && rSse != S3ServerSideEncryption.NONE) {
89-
builder.serverSideEncryption(
90-
software.amazon.awssdk.services.s3.model.ServerSideEncryption.valueOf(rSse.name())
111+
S3ServerSideEncryption sse = runContext
112+
.render(this.to.serverSideEncryption)
113+
.as(S3ServerSideEncryption.class)
114+
.orElse(null);
115+
116+
if (sse != null && sse != S3ServerSideEncryption.NONE) {
117+
copyObjectBuilder.serverSideEncryption(
118+
software.amazon.awssdk.services.s3.model.ServerSideEncryption.valueOf(sse.name())
91119
);
120+
121+
// If using AWS_KMS encryption, set the KMS key ID
122+
if (sse == S3ServerSideEncryption.AWS_KMS && this.to.kmsKeyId != null) {
123+
copyObjectBuilder.ssekmsKeyId(
124+
runContext.render(this.to.kmsKeyId).as(String.class).orElseThrow()
125+
);
126+
}
92127
}
93128
}
94129

130+
CopyObjectRequest copyObjectRequest = copyObjectBuilder.build();
131+
132+
// TransferManager copy (parallel & multipart aware)
133+
CopyRequest copyRequest = CopyRequest.builder()
134+
.copyObjectRequest(copyObjectRequest)
135+
.build();
95136

96-
CopyObjectRequest request = builder.build();
97-
CopyObjectResponse response = client.copyObject(request);
137+
CompletedCopy completedCopy = transferManager
138+
.copy(copyRequest)
139+
.completionFuture()
140+
.join();
98141

99-
if (runContext.render(this.delete).as(Boolean.class).orElseThrow()) {
142+
// Optional delete source
143+
if (runContext.render(this.delete).as(Boolean.class).orElse(false)) {
100144
Delete.builder()
101-
.id(this.id)
102-
.type(Delete.class.getName())
103-
.region(this.region)
104-
.endpointOverride(this.endpointOverride)
105-
.accessKeyId(this.accessKeyId)
106-
.secretKeyId(this.secretKeyId)
107-
.sessionToken(this.sessionToken)
108-
.stsRoleSessionName(this.stsRoleSessionName)
109-
.stsRoleExternalId(this.stsRoleExternalId)
110-
.stsRoleSessionDuration(this.stsRoleSessionDuration)
111-
.stsRoleArn(this.stsRoleArn)
112-
.stsEndpointOverride(this.stsEndpointOverride)
113-
.bucket(Property.ofValue(request.sourceBucket()))
114-
.key(Property.ofValue(request.sourceKey()))
115-
.build()
116-
.run(runContext);
145+
.id(this.id)
146+
.type(Delete.class.getName())
147+
.region(this.region)
148+
.endpointOverride(this.endpointOverride)
149+
.accessKeyId(this.accessKeyId)
150+
.secretKeyId(this.secretKeyId)
151+
.sessionToken(this.sessionToken)
152+
.stsRoleArn(this.stsRoleArn)
153+
.stsRoleExternalId(this.stsRoleExternalId)
154+
.stsRoleSessionName(this.stsRoleSessionName)
155+
.stsRoleSessionDuration(this.stsRoleSessionDuration)
156+
.stsEndpointOverride(this.stsEndpointOverride)
157+
.bucket(Property.ofValue(copyObjectRequest.sourceBucket()))
158+
.key(Property.ofValue(copyObjectRequest.sourceKey()))
159+
.build()
160+
.run(runContext);
117161
}
118162

119-
return Output
120-
.builder()
121-
.bucket(request.destinationBucket())
122-
.key(request.destinationKey())
123-
.eTag(response.copyObjectResult().eTag())
124-
.build();
163+
return Output.builder()
164+
.bucket(copyObjectRequest.destinationBucket())
165+
.key(copyObjectRequest.destinationKey())
166+
.eTag(completedCopy.response().copyObjectResult().eTag())
167+
.build();
125168
}
126169
}
127170

128171
@SuperBuilder(toBuilder = true)
129172
@Getter
130173
@NoArgsConstructor
131174
public static class CopyObject {
175+
132176
@Schema(
133-
title = "The bucket name"
177+
title = "The bucket name"
134178
)
135179
@NotNull
136180
Property<String> bucket;
137181

138182
@Schema(
139-
title = "The bucket key"
183+
title = "The bucket key"
140184
)
141185
@NotNull
142186
Property<String> key;
143187

144188
@Schema(
145-
title = "Server side encryption to apply to the target object.",
146-
description = "Example: AES256 or AWS_KMS"
189+
title = "Server side encryption to apply to the target object.",
190+
description = "Example: AES256 or AWS_KMS"
147191
)
148192
private Property<S3ServerSideEncryption> serverSideEncryption;
149-
193+
150194
@Schema(
151-
title = "KMS Key ARN or Key ID to use when server side encryption is AWS_KMS"
195+
title = "KMS Key ARN or Key ID to use when server side encryption is AWS_KMS"
152196
)
153197
private Property<String> kmsKeyId;
154198
}
@@ -157,8 +201,9 @@ public static class CopyObject {
157201
@Getter
158202
@NoArgsConstructor
159203
public static class CopyObjectFrom extends CopyObject {
204+
160205
@Schema(
161-
title = "The specific version of the object."
206+
title = "The specific version of the object."
162207
)
163208
private Property<String> versionId;
164209
}
@@ -167,6 +212,7 @@ public static class CopyObjectFrom extends CopyObject {
167212
@Getter
168213
@NoArgsConstructor
169214
public static class Output extends ObjectOutput implements io.kestra.core.models.tasks.Output {
215+
170216
private String bucket;
171217
private String key;
172218
}

0 commit comments

Comments
 (0)