Skip to content

Commit 115f116

Browse files
authored
refactor: move all plugin properties to dynamic properties (#566)
* refactor: move Query and abstract connection to dynamic properties * refactor: move DynamoDB to dynamic properties * refactor: move Kinesis and eventbridge to dynamic properties * refactor: move Lambda to dynamic properties * refactor: move s3 to dynamic properties * refactor: move Sns to dynamic properties * refactor: move Sqs to dynamic properties
1 parent d997c52 commit 115f116

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+674
-718
lines changed

src/main/java/io/kestra/plugin/aws/AbstractConnection.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,21 @@
1616
public abstract class AbstractConnection extends Task implements AbstractConnectionInterface {
1717

1818
protected Property<String> region;
19-
protected String endpointOverride;
20-
protected Boolean compatibilityMode;
19+
protected Property<String> endpointOverride;
20+
protected Property<Boolean> compatibilityMode;
2121

2222
// Configuration for StaticCredentialsProvider
23-
protected String accessKeyId;
24-
protected String secretKeyId;
25-
protected String sessionToken;
23+
protected Property<String> accessKeyId;
24+
protected Property<String> secretKeyId;
25+
protected Property<String> sessionToken;
2626

2727
// Configuration for AWS STS AssumeRole
28-
protected String stsRoleArn;
29-
protected String stsRoleExternalId;
30-
protected String stsRoleSessionName;
31-
protected String stsEndpointOverride;
28+
protected Property<String> stsRoleArn;
29+
protected Property<String> stsRoleExternalId;
30+
protected Property<String> stsRoleSessionName;
31+
protected Property<String> stsEndpointOverride;
3232
@Builder.Default
33-
protected Duration stsRoleSessionDuration = AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION;
33+
protected Property<Duration> stsRoleSessionDuration = Property.of(AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION);
3434

3535
/**
3636
* Common AWS Client configuration properties.

src/main/java/io/kestra/plugin/aws/AbstractConnectionInterface.java

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,56 +16,48 @@ public interface AbstractConnectionInterface {
1616
title = "Access Key Id in order to connect to AWS.",
1717
description = "If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials."
1818
)
19-
@PluginProperty(dynamic = true)
20-
String getAccessKeyId();
19+
Property<String> getAccessKeyId();
2120

2221
@Schema(
2322
title = "Secret Key Id in order to connect to AWS.",
2423
description = "If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials."
2524
)
26-
@PluginProperty(dynamic = true)
27-
String getSecretKeyId();
25+
Property<String> getSecretKeyId();
2826

2927
@Schema(
3028
title = "AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.",
3129
description = "If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials."
3230
)
33-
@PluginProperty(dynamic = true)
34-
String getSessionToken();
31+
Property<String> getSessionToken();
3532

3633
@Schema(
3734
title = "AWS STS Role.",
3835
description = "The Amazon Resource Name (ARN) of the role to assume. If set the task will use the `StsAssumeRoleCredentialsProvider`. If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials."
3936
)
40-
@PluginProperty(dynamic = true)
41-
String getStsRoleArn();
37+
Property<String> getStsRoleArn();
4238

4339
@Schema(
4440
title = "AWS STS External Id.",
4541
description = " A unique identifier that might be required when you assume a role in another account. This property is only used when an `stsRoleArn` is defined."
4642
)
47-
@PluginProperty(dynamic = true)
48-
String getStsRoleExternalId();
43+
Property<String> getStsRoleExternalId();
4944

5045
@Schema(
5146
title = "AWS STS Session name.",
5247
description = "This property is only used when an `stsRoleArn` is defined."
5348
)
54-
@PluginProperty(dynamic = true)
55-
String getStsRoleSessionName();
49+
Property<String> getStsRoleSessionName();
5650

5751
@Schema(
5852
title = "AWS STS Session duration.",
5953
description = "The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an `stsRoleArn` is defined."
6054
)
61-
@PluginProperty
62-
Duration getStsRoleSessionDuration();
55+
Property<Duration> getStsRoleSessionDuration();
6356

6457
@Schema(
6558
title = "The AWS STS endpoint with which the SDKClient should communicate."
6659
)
67-
@PluginProperty(dynamic = true)
68-
String getStsEndpointOverride();
60+
Property<String> getStsEndpointOverride();
6961

7062
@Schema(
7163
title = "AWS region with which the SDK should communicate."
@@ -76,26 +68,24 @@ public interface AbstractConnectionInterface {
7668
title = "The endpoint with which the SDK should communicate.",
7769
description = "This property allows you to use a different S3 compatible storage backend."
7870
)
79-
@PluginProperty(dynamic = true)
80-
String getEndpointOverride();
71+
Property<String> getEndpointOverride();
8172

82-
@PluginProperty(dynamic = true)
83-
default Boolean getCompatibilityMode() {
84-
return false;
73+
default Property<Boolean> getCompatibilityMode() {
74+
return Property.of(false);
8575
}
8676

8777
default AbstractConnection.AwsClientConfig awsClientConfig(final RunContext runContext) throws IllegalVariableEvaluationException {
8878
return new AbstractConnection.AwsClientConfig(
89-
runContext.render(this.getAccessKeyId()),
90-
runContext.render(this.getSecretKeyId()),
91-
runContext.render(this.getSessionToken()),
92-
runContext.render(this.getStsRoleArn()),
93-
runContext.render(this.getStsRoleExternalId()),
94-
runContext.render(this.getStsRoleSessionName()),
95-
runContext.render(this.getStsEndpointOverride()),
96-
getStsRoleSessionDuration(),
97-
this.getRegion() == null ? null : this.getRegion().as(runContext, String.class),
98-
runContext.render(this.getEndpointOverride())
79+
runContext.render(this.getAccessKeyId()).as(String.class).orElse(null),
80+
runContext.render(this.getSecretKeyId()).as(String.class).orElse(null),
81+
runContext.render(this.getSessionToken()).as(String.class).orElse(null),
82+
runContext.render(this.getStsRoleArn()).as(String.class).orElse(null),
83+
runContext.render(this.getStsRoleExternalId()).as(String.class).orElse(null),
84+
runContext.render(this.getStsRoleSessionName()).as(String.class).orElse(null),
85+
runContext.render(this.getStsEndpointOverride()).as(String.class).orElse(null),
86+
runContext.render(this.getStsRoleSessionDuration()).as(Duration.class).orElse(null),
87+
runContext.render(this.getRegion()).as(String.class).orElse(null),
88+
runContext.render(this.getEndpointOverride()).as(String.class).orElse(null)
9989
);
10090
}
10191
}

src/main/java/io/kestra/plugin/aws/athena/Query.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.kestra.core.models.annotations.Plugin;
66
import io.kestra.core.models.annotations.PluginProperty;
77
import io.kestra.core.models.executions.metrics.Counter;
8+
import io.kestra.core.models.property.Property;
89
import io.kestra.core.models.tasks.Output;
910
import io.kestra.core.models.tasks.RunnableTask;
1011
import io.kestra.core.models.tasks.common.FetchType;
@@ -49,7 +50,7 @@
4950
title = "Query an Athena table.",
5051
description = """
5152
The query will wait for completion, except if fetchMode is set to `NONE`, and will output converted rows.
52-
Row conversion is based on the types listed [here](https://docs.aws.amazon.com/athena/latest/ug/data-types.html).
53+
Row conversion is based on the types listed [here](https://docs.aws.amazon.com/athena/latest/ug/data-types.html).
5354
Complex data types like array, map and struct will be converted to a string."""
5455
)
5556
@Plugin(
@@ -78,26 +79,22 @@
7879
)
7980
public class Query extends AbstractConnection implements RunnableTask<Query.QueryOutput> {
8081
@Schema(title = "Athena catalog.")
81-
@PluginProperty(dynamic = true)
82-
private String catalog;
82+
private Property<String> catalog;
8383

8484
@Schema(title = "Athena database.")
8585
@NotNull
86-
@PluginProperty(dynamic = true)
87-
private String database;
86+
private Property<String> database;
8887

8988
@Schema(
9089
title = "Athena output location.",
9190
description = "The query results will be stored in this output location. Must be an existing S3 bucket."
9291
)
9392
@NotNull
94-
@PluginProperty(dynamic = true)
95-
private String outputLocation;
93+
private Property<String> outputLocation;
9694

9795
@Schema(title = "Athena SQL query.")
9896
@NotNull
99-
@PluginProperty(dynamic = true)
100-
private String query;
97+
private Property<String> query;
10198

10299
@Schema(
103100
title = "The way you want to store the data.",
@@ -107,15 +104,13 @@ public class Query extends AbstractConnection implements RunnableTask<Query.Quer
107104
+ "NONE does nothing — in this case, the task submits the query without waiting for its completion."
108105
)
109106
@NotNull
110-
@PluginProperty
111107
@Builder.Default
112-
private FetchType fetchType = FetchType.STORE;
108+
private Property<FetchType> fetchType = Property.of(FetchType.STORE);
113109

114110
@Schema(title = "Whether to skip the first row which is usually the header.")
115111
@NotNull
116-
@PluginProperty
117112
@Builder.Default
118-
private boolean skipHeader = true;
113+
private Property<Boolean> skipHeader = Property.of(true);
119114

120115

121116
private static DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
@@ -125,23 +120,24 @@ public class Query extends AbstractConnection implements RunnableTask<Query.Quer
125120
public QueryOutput run(RunContext runContext) throws Exception {
126121
// The QueryExecutionContext allows us to set the database.
127122
var queryExecutionContext = QueryExecutionContext.builder()
128-
.catalog(catalog != null ? runContext.render(catalog) : null)
129-
.database(runContext.render(database))
123+
.catalog(catalog != null ? runContext.render(catalog).as(String.class).orElseThrow() : null)
124+
.database(runContext.render(database).as(String.class).orElseThrow())
130125
.build();
131126

132127
// The result configuration specifies where the results of the query should go.
133128
var resultConfiguration = ResultConfiguration.builder()
134-
.outputLocation(runContext.render(outputLocation))
129+
.outputLocation(runContext.render(outputLocation).as(String.class).orElseThrow())
135130
.build();
136131

137132
var startQueryExecutionRequest = StartQueryExecutionRequest.builder()
138-
.queryString(runContext.render(query))
133+
.queryString(runContext.render(query).as(String.class).orElseThrow())
139134
.queryExecutionContext(queryExecutionContext)
140135
.resultConfiguration(resultConfiguration)
141136
.build();
142137

143138
try (var client = client(runContext)) {
144139
var startQueryExecution = client.startQueryExecution(startQueryExecutionRequest);
140+
var fetchType = runContext.render(this.fetchType).as(FetchType.class).orElseThrow();
145141
runContext.logger().info("Query created with Athena execution identifier {}", startQueryExecution.queryExecutionId());
146142
if (fetchType == FetchType.NONE) {
147143
return QueryOutput.builder().queryExecutionId(startQueryExecution.queryExecutionId()).build();
@@ -179,7 +175,7 @@ public QueryOutput run(RunContext runContext) throws Exception {
179175
.build();
180176
var getQueryResultsResults = client.getQueryResults(getQueryResult);
181177
List<Row> results = getQueryResultsResults.resultSet().rows();
182-
if (skipHeader && results != null && !results.isEmpty()) {
178+
if (runContext.render(skipHeader).as(Boolean.class).orElseThrow() && results != null && !results.isEmpty()) {
183179
// we skip the first row, this is usually needed as by default Athena returns the header as the first row
184180
results = results.subList(1, results.size());
185181
}

src/main/java/io/kestra/plugin/aws/auth/EksToken.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,18 @@ public Output run(RunContext runContext) throws Exception {
7070
if(this.getRegion() == null) {
7171
throw new RuntimeException("Region is required");
7272
}
73-
final Region awsRegion = Region.of(this.getRegion().as(runContext, String.class));
73+
final Region awsRegion = Region.of(runContext.render(this.getRegion()).as(String.class).orElseThrow());
7474

7575
SdkHttpFullRequest requestToSign = SdkHttpFullRequest
7676
.builder()
7777
.method(SdkHttpMethod.GET)
7878
.uri(getStsRegionalEndpointUri(runContext, awsRegion))
79-
.appendHeader("x-k8s-aws-id", this.clusterName.as(runContext, String.class))
79+
.appendHeader("x-k8s-aws-id", runContext.render(this.clusterName).as(String.class).orElseThrow())
8080
.appendRawQueryParameter("Action", "GetCallerIdentity")
8181
.appendRawQueryParameter("Version", "2011-06-15")
8282
.build();
8383

84-
ZonedDateTime expirationDate = ZonedDateTime.now().plusSeconds(expirationDuration.as(runContext, Long.class));
84+
ZonedDateTime expirationDate = ZonedDateTime.now().plusSeconds(runContext.render(expirationDuration).as(Long.class).orElseThrow());
8585
Aws4PresignerParams presignerParams = Aws4PresignerParams.builder()
8686
.awsCredentials(ConnectionUtils.credentialsProvider(this.awsClientConfig(runContext)).resolveCredentials())
8787
.signingRegion(awsRegion)

src/main/java/io/kestra/plugin/aws/cli/AwsCLI.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import lombok.*;
2121
import lombok.experimental.SuperBuilder;
2222

23+
import java.time.Duration;
2324
import java.util.ArrayList;
2425
import java.util.HashMap;
2526
import java.util.List;
@@ -148,19 +149,19 @@ public ScriptOutput run(RunContext runContext) throws Exception {
148149

149150
// hack for missing env vars supports: https://github.com/aws/aws-cli/issues/5639
150151
if (this.stsRoleArn != null) {
151-
allCommands.add("aws configure set role_arn " + runContext.render(this.stsRoleArn));
152+
allCommands.add("aws configure set role_arn " + runContext.render(this.stsRoleArn).as(String.class).orElseThrow());
152153
}
153154

154155
if (this.stsRoleSessionName != null) {
155-
allCommands.add("aws configure set role_session_name " + runContext.render(this.stsRoleSessionName));
156+
allCommands.add("aws configure set role_session_name " + runContext.render(this.stsRoleSessionName).as(String.class).orElseThrow());
156157
}
157158

158159
if (this.stsRoleExternalId != null) {
159-
allCommands.add("aws configure set external_id " + runContext.render(this.stsRoleExternalId));
160+
allCommands.add("aws configure set external_id " + runContext.render(this.stsRoleExternalId).as(String.class).orElseThrow());
160161
}
161162

162163
if (this.stsRoleSessionDuration != null) {
163-
allCommands.add("aws configure set duration_seconds " + stsRoleSessionDuration.getSeconds());
164+
allCommands.add("aws configure set duration_seconds " + runContext.render(stsRoleSessionDuration).as(Duration.class).orElseThrow().getSeconds());
164165
}
165166

166167
if (this.stsCredentialSource != null) {
@@ -206,23 +207,23 @@ private Map<String, String> getEnv(RunContext runContext) throws IllegalVariable
206207
Map<String, String> envs = new HashMap<>();
207208

208209
if (this.accessKeyId != null) {
209-
envs.put("AWS_ACCESS_KEY_ID", runContext.render(this.accessKeyId));
210+
envs.put("AWS_ACCESS_KEY_ID", runContext.render(this.accessKeyId).as(String.class).orElseThrow());
210211
}
211212

212213
if (this.secretKeyId != null) {
213-
envs.put("AWS_SECRET_ACCESS_KEY", runContext.render(this.secretKeyId));
214+
envs.put("AWS_SECRET_ACCESS_KEY", runContext.render(this.secretKeyId).as(String.class).orElseThrow());
214215
}
215216

216217
if (this.region != null) {
217-
envs.put("AWS_DEFAULT_REGION", this.region.as(runContext, String.class));
218+
envs.put("AWS_DEFAULT_REGION", runContext.render(this.region).as(String.class).orElseThrow());
218219
}
219220

220221
if (this.sessionToken != null) {
221-
envs.put("AWS_SESSION_TOKEN", runContext.render(this.sessionToken));
222+
envs.put("AWS_SESSION_TOKEN", runContext.render(this.sessionToken).as(String.class).orElseThrow());
222223
}
223224

224225
if (this.endpointOverride != null) {
225-
envs.put("AWS_ENDPOINT_URL", runContext.render(this.endpointOverride));
226+
envs.put("AWS_ENDPOINT_URL", runContext.render(this.endpointOverride).as(String.class).orElseThrow());
226227
}
227228

228229
envs.put("AWS_DEFAULT_OUTPUT", this.outputFormat.toString());

src/main/java/io/kestra/plugin/aws/dynamodb/AbstractDynamoDb.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
44
import io.kestra.core.models.annotations.PluginProperty;
55
import io.kestra.core.models.executions.metrics.Counter;
6+
import io.kestra.core.models.property.Property;
67
import io.kestra.core.models.tasks.common.FetchOutput;
78
import io.kestra.core.models.tasks.common.FetchType;
89
import io.kestra.core.runners.RunContext;
@@ -40,9 +41,8 @@
4041
@NoArgsConstructor
4142
public abstract class AbstractDynamoDb extends AbstractConnection {
4243
@Schema(title = "The DynamoDB table name.")
43-
@PluginProperty(dynamic = true)
4444
@NotNull
45-
protected String tableName;
45+
protected Property<String> tableName;
4646

4747
protected DynamoDbClient client(final RunContext runContext) throws IllegalVariableEvaluationException {
4848
final AwsClientConfig clientConfig = awsClientConfig(runContext);
@@ -109,7 +109,7 @@ protected AttributeValue objectFrom(Object value) {
109109
return AttributeValue.fromS(value.toString());
110110
}
111111

112-
protected FetchOutput fetchOutputs(List<Map<String, AttributeValue>> items, FetchType fetchType, RunContext runContext) throws IOException {
112+
protected FetchOutput fetchOutputs(List<Map<String, AttributeValue>> items, FetchType fetchType, RunContext runContext) throws IOException, IllegalVariableEvaluationException {
113113
var outputBuilder = FetchOutput.builder();
114114
switch (fetchType) {
115115
case FETCH:
@@ -139,7 +139,7 @@ protected FetchOutput fetchOutputs(List<Map<String, AttributeValue>> items, Fetc
139139

140140
runContext.metric(Counter.of(
141141
"records", output.getSize(),
142-
"tableName", getTableName()
142+
"tableName", runContext.render(getTableName()).as(String.class).orElseThrow()
143143
));
144144

145145
return output;

src/main/java/io/kestra/plugin/aws/dynamodb/DeleteItem.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import io.kestra.core.models.annotations.Example;
44
import io.kestra.core.models.annotations.Plugin;
55
import io.kestra.core.models.annotations.PluginProperty;
6+
import io.kestra.core.models.property.Property;
67
import io.kestra.core.models.tasks.RunnableTask;
78
import io.kestra.core.models.tasks.VoidOutput;
89
import io.kestra.core.runners.RunContext;
@@ -41,7 +42,7 @@
4142
secretKeyId: "<secret-key>"
4243
region: "eu-central-1"
4344
tableName: "persons"
44-
key:
45+
key:
4546
id: "1"
4647
"""
4748
)
@@ -52,16 +53,16 @@ public class DeleteItem extends AbstractDynamoDb implements RunnableTask<VoidOut
5253
title = "The DynamoDB item key.",
5354
description = "The DynamoDB item identifier."
5455
)
55-
@PluginProperty
56-
private Map<String, Object> key;
56+
private Property<Map<String, Object>> key;
5757

5858
@Override
5959
public VoidOutput run(RunContext runContext) throws Exception {
6060
try (var dynamoDb = client(runContext)) {
61-
Map<String, AttributeValue> key = valueMapFrom(getKey());
61+
var renderedKey = runContext.render(this.key).asMap(String.class, Object.class);
62+
Map<String, AttributeValue> key = valueMapFrom(renderedKey);
6263

6364
var deleteRequest = DeleteItemRequest.builder()
64-
.tableName(runContext.render(this.getTableName()))
65+
.tableName(runContext.render(this.getTableName()).as(String.class).orElseThrow())
6566
.key(key)
6667
.build();
6768

0 commit comments

Comments
 (0)