Skip to content

Commit 85e57af

Browse files
authored
feat: add EMR task (#574)
add CreateCluster task add AddJobFlowsSteps task add DeleteCluster task add unit test with link to tutorial
1 parent 12fdb09 commit 85e57af

File tree

9 files changed

+612
-0
lines changed

9 files changed

+612
-0
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ dependencies {
7272
api 'software.amazon.awssdk:sts'
7373
api 'software.amazon.awssdk:ecr'
7474
api 'software.amazon.awssdk:netty-nio-client'
75+
api 'software.amazon.awssdk:emr'
7576
}
7677

7778

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.kestra.plugin.aws.emr;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.runners.RunContext;
5+
import io.kestra.plugin.aws.AbstractConnection;
6+
import io.kestra.plugin.aws.ConnectionUtils;
7+
import lombok.EqualsAndHashCode;
8+
import lombok.Getter;
9+
import lombok.NoArgsConstructor;
10+
import lombok.ToString;
11+
import lombok.experimental.SuperBuilder;
12+
import software.amazon.awssdk.services.emr.EmrClient;
13+
14+
@SuperBuilder
15+
@ToString
16+
@EqualsAndHashCode
17+
@Getter
18+
@NoArgsConstructor
19+
public abstract class AbstractEmrTask extends AbstractConnection {
20+
protected EmrClient client(final RunContext runContext) throws IllegalVariableEvaluationException {
21+
final AwsClientConfig clientConfig = awsClientConfig(runContext);
22+
return ConnectionUtils.configureSyncClient(clientConfig, EmrClient.builder()).build();
23+
}
24+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package io.kestra.plugin.aws.emr;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.models.annotations.Example;
5+
import io.kestra.core.models.annotations.Plugin;
6+
import io.kestra.core.models.property.Property;
7+
import io.kestra.core.models.tasks.RunnableTask;
8+
import io.kestra.core.models.tasks.VoidOutput;
9+
import io.kestra.core.runners.RunContext;
10+
import io.kestra.plugin.aws.emr.models.StepConfig;
11+
import io.swagger.v3.oas.annotations.media.Schema;
12+
import jakarta.validation.constraints.NotNull;
13+
import lombok.EqualsAndHashCode;
14+
import lombok.Getter;
15+
import lombok.NoArgsConstructor;
16+
import lombok.ToString;
17+
import lombok.experimental.SuperBuilder;
18+
19+
import java.util.List;
20+
21+
import static io.kestra.core.utils.Rethrow.throwConsumer;
22+
import static io.kestra.core.utils.Rethrow.throwFunction;
23+
24+
@SuperBuilder
25+
@ToString
26+
@EqualsAndHashCode
27+
@Getter
28+
@NoArgsConstructor
29+
@Schema(
30+
title = "Add steps on an existing EMR cluster."
31+
)
32+
@Plugin(
33+
examples = {
34+
@Example(
35+
title = "",
36+
full = true,
37+
code = """
38+
id: aws_emr_add_emr_job_steps
39+
namespace: company.team
40+
41+
tasks:
42+
- id: add_steps_emr
43+
type: io.kestra.plugin.aws.emr.AddJobFlowsSteps
44+
accessKeyId: "<access-key>"
45+
secretKeyId: "<secret-key>"
46+
region: "eu-west-3"
47+
clusterId: j-XXXXXXXXXXXX
48+
steps:
49+
- name: Spark_job_test
50+
jar: "command-runner.jar"
51+
actionOnFailure: CONTINUE
52+
arguments:
53+
- "spark-submit"
54+
- "s3://my-bucket/health_violations.py"
55+
- "--data_source"
56+
- "s3://my-bucket/food_establishment_data.csv"
57+
- "--output_uri"
58+
- "s3://my-bucket/test-emr-output"
59+
"""
60+
)
61+
}
62+
)
63+
public class AddJobFlowsSteps extends AbstractEmrTask implements RunnableTask<VoidOutput> {
64+
@Schema(title = "Cluster ID.")
65+
@NotNull
66+
private Property<String> clusterId;
67+
68+
@Schema(
69+
title = "Steps",
70+
description = "List of steps to add to the existing cluster."
71+
)
72+
@NotNull
73+
private List<StepConfig> steps;
74+
75+
@Schema(
76+
title = "Execution role ARN.",
77+
description = """
78+
The Amazon Resource Name (ARN) of the runtime role for a step on the cluster. The runtime role can be a cross-account IAM role.
79+
The runtime role ARN is a combination of account ID, role name, and role type using the following format: arn:partition:service:region:account:resource.
80+
"""
81+
)
82+
private Property<String> executionRoleArn;
83+
84+
@Override
85+
public VoidOutput run(RunContext runContext) throws IllegalVariableEvaluationException {
86+
try(var emrClient = this.client(runContext)) {
87+
List<software.amazon.awssdk.services.emr.model.StepConfig> jobSteps = steps.stream()
88+
.map(throwFunction(stepConfig -> stepConfig.toStep(runContext)))
89+
.toList();
90+
91+
emrClient.addJobFlowSteps(throwConsumer(request -> request
92+
.steps(jobSteps)
93+
.jobFlowId(runContext.render(this.clusterId).as(String.class).orElseThrow())
94+
.executionRoleArn(runContext.render(this.executionRoleArn).as(String.class).orElse(null))
95+
));
96+
return null;
97+
}
98+
}
99+
}
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package io.kestra.plugin.aws.emr;
2+
3+
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
4+
import io.kestra.core.models.annotations.Example;
5+
import io.kestra.core.models.annotations.Plugin;
6+
import io.kestra.core.models.property.Property;
7+
import io.kestra.core.models.tasks.RunnableTask;
8+
import io.kestra.core.runners.RunContext;
9+
import io.kestra.core.utils.Await;
10+
import io.kestra.plugin.aws.emr.models.StepConfig;
11+
import io.swagger.v3.oas.annotations.media.Schema;
12+
import jakarta.validation.constraints.NotNull;
13+
import lombok.*;
14+
import lombok.experimental.SuperBuilder;
15+
import software.amazon.awssdk.services.emr.model.*;
16+
17+
import java.time.Duration;
18+
import java.util.List;
19+
import java.util.concurrent.TimeoutException;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import static io.kestra.core.utils.Rethrow.throwFunction;
23+
24+
@SuperBuilder
25+
@ToString
26+
@EqualsAndHashCode
27+
@Getter
28+
@NoArgsConstructor
29+
@Schema(
30+
title = "Create an EMR Cluster, submit steps to be processed, then get the cluster ID as an output."
31+
)
32+
@Plugin(
33+
examples = {
34+
@Example(
35+
title = "Create an EMR Cluster, submit a Spark job, wait until the job is terminated.",
36+
full = true,
37+
code = """
38+
id: aws_emr_create_cluster
39+
namespace: company.team
40+
41+
tasks:
42+
- id: create_cluster
43+
type: io.kestra.plugin.aws.emr.CreateCluster
44+
accessKeyId: <access-key>
45+
secretKeyId: <secret-key>
46+
region: eu-west-3
47+
clusterName: "Spark job cluster"
48+
logUri: "s3://my-bucket/test-emr-logs"
49+
keepJobFlowAliveWhenNoSteps: true
50+
applications:
51+
- Spark
52+
masterInstanceType: m5.xlarge
53+
slaveInstanceType: m5.xlarge
54+
instanceCount: 3
55+
ec2KeyName: my-ec2-ssh-key-pair-name
56+
steps:
57+
- name: Spark_job_test
58+
jar: "command-runner.jar"
59+
actionOnFailure: CONTINUE
60+
arguments:
61+
- "spark-submit"
62+
- "s3://my-bucket/health_violations.py"
63+
- "--data_source"
64+
- "s3://my-bucket/food_establishment_data.csv"
65+
- "--output_uri"
66+
- "s3://my-bucket/test-emr-output"
67+
wait: true
68+
"""
69+
)
70+
}
71+
)
72+
public class CreateCluster extends AbstractEmrTask implements RunnableTask<CreateCluster.Output> {
73+
74+
@Schema(title = "Cluster Name.")
75+
@NotNull
76+
private Property<String> clusterName;
77+
78+
@Schema(title = "Release Label.", description = "It specifies the EMR release version label. Pattern is 'emr-x.x.x'.")
79+
@NotNull
80+
@Builder.Default
81+
private Property<String> releaseLabel = Property.of("emr-5.20.0");
82+
83+
@Schema(
84+
title = "Steps",
85+
description = "List of steps to run."
86+
)
87+
private List<StepConfig> steps;
88+
89+
@Schema(title = "Applications.", description = "List of applications name: Ex: \"Hive\", \"Spark\", \"Ganglia\"")
90+
private Property<List<String>> applications;
91+
92+
@Schema(title = "Log URI.", description = "The location in Amazon S3 to write the log files of the job flow. If a value is not provided, logs are not created.")
93+
private Property<String> logUri;
94+
95+
@Schema(title = "Job flow Role.", description = """
96+
Also called instance profile and Amazon EC2 role. An IAM role for an Amazon EMR cluster.
97+
The Amazon EC2 instances of the cluster assume this role. The default role is EMR_EC2_DefaultRole.
98+
In order to use the default role, you must have already created it using the CLI or console.
99+
""")
100+
@NotNull
101+
@Builder.Default
102+
private Property<String> jobFlowRole = Property.of("EMR_EC2_DefaultRole");
103+
104+
@Schema(title = "Visible to all users.", description = "Set this value to true so that IAM principals in the Amazon Web Services account associated with the cluster can perform Amazon EMR actions on the cluster that their IAM policies allow.")
105+
@Builder.Default
106+
private Property<Boolean> visibleToAllUsers = Property.of(true);
107+
108+
@Schema(title = "Service Role.", description = """
109+
The IAM role that Amazon EMR assumes in order to access Amazon Web Services resources on your behalf.
110+
If you've created a custom service role path, you must specify it for the service role when you launch your cluster.
111+
""")
112+
@NotNull
113+
@Builder.Default
114+
private Property<String> serviceRole = Property.of("EMR_DefaultRole");
115+
116+
@Schema(title = "Master Instance Type.", description = "EC2 instance type for master instances.")
117+
private Property<String> masterInstanceType;
118+
119+
@Schema(title = "Slave Instance Type.", description = "EC2 instance type for slave instances.")
120+
private Property<String> slaveInstanceType;
121+
122+
@Schema(title = "Keep job flow alive.", description = "Specifies whether the cluster should remain available after completing all steps. Defaults to false.")
123+
@Builder.Default
124+
private Property<Boolean> keepJobFlowAliveWhenNoSteps = Property.of(false);
125+
126+
@Schema(title = "Instance count.")
127+
private Property<Integer> instanceCount;
128+
129+
@Schema(title = "EC2 Key name.", description = "The name of the Amazon EC2 key pair that can be used to connect to the master node using SSH as the user called \"hadoop\".")
130+
private Property<String> ec2KeyName;
131+
132+
@Schema(
133+
title = "EC2 Subnet ID.",
134+
description = """
135+
Applies to clusters that use the uniform instance group configuration.
136+
To launch the cluster in Amazon Virtual Private Cloud (Amazon VPC), set this parameter to the identifier of the Amazon VPC subnet where you want the cluster to launch.
137+
If you do not specify this value and your account supports EC2-Classic, the cluster launches in EC2-Classic.
138+
"""
139+
)
140+
private Property<String> ec2SubnetId;
141+
142+
@Schema(
143+
title = "Wait for the end of the run.",
144+
description = "If set to true it will wait until the cluster has status TERMINATED or WAITING."
145+
)
146+
@Builder.Default
147+
private Property<Boolean> wait = Property.of(Boolean.FALSE);
148+
149+
@Schema(
150+
title = "Check interval duration.",
151+
description = "The frequency with which the task checks whether the job is completed."
152+
)
153+
@Builder.Default
154+
private Property<Duration> completionCheckInterval = Property.of(Duration.ofSeconds(10));
155+
156+
@Schema(
157+
title = "Completion timeout."
158+
)
159+
@Builder.Default
160+
private Property<Duration> waitUntilCompletion = Property.of(Duration.ofHours(1));
161+
162+
@Override
163+
public Output run(RunContext runContext) throws IllegalVariableEvaluationException {
164+
try(var emrClient = this.client(runContext)) {
165+
//Create Applications to be loaded
166+
List<Application> applicationsList = runContext.render(this.applications).asList(String.class)
167+
.stream()
168+
.map(name -> Application.builder().name(name).build()).toList();
169+
170+
//Create instances configuration
171+
JobFlowInstancesConfig instancesConfig = JobFlowInstancesConfig.builder()
172+
.ec2SubnetId(runContext.render(this.ec2SubnetId).as(String.class).orElse(null))
173+
.ec2KeyName(runContext.render(this.ec2KeyName).as(String.class).orElse(null))
174+
.instanceCount(runContext.render(this.instanceCount).as(Integer.class).orElseThrow())
175+
.masterInstanceType(runContext.render(this.masterInstanceType).as(String.class).orElse(null))
176+
.slaveInstanceType(runContext.render(this.slaveInstanceType).as(String.class).orElse(null))
177+
.keepJobFlowAliveWhenNoSteps(runContext.render(this.keepJobFlowAliveWhenNoSteps).as(Boolean.class).orElseThrow())
178+
.build();
179+
180+
RunJobFlowRequest jobFlowRequest = RunJobFlowRequest.builder()
181+
.name(runContext.render(this.clusterName).as(String.class).orElseThrow())
182+
.releaseLabel(runContext.render(this.releaseLabel).as(String.class).orElseThrow())
183+
.applications(applicationsList)
184+
.logUri(runContext.render(this.logUri).as(String.class).orElse(null))
185+
.serviceRole(runContext.render(this.serviceRole).as(String.class).orElse(null))
186+
.jobFlowRole(runContext.render(this.jobFlowRole).as(String.class).orElse(null))
187+
.instances(instancesConfig)
188+
.steps(steps == null ? null : steps.stream()
189+
.map(throwFunction(step -> step.toStep(runContext)))
190+
.toList()
191+
)
192+
.visibleToAllUsers(runContext.render(this.visibleToAllUsers).as(Boolean.class).orElseThrow())
193+
.build();
194+
195+
RunJobFlowResponse response = emrClient.runJobFlow(jobFlowRequest);
196+
String jobFlowId = response.jobFlowId();
197+
198+
runContext.logger().info("Cluster created with id : {}", jobFlowId);
199+
200+
if (!Boolean.TRUE.equals(runContext.render(this.wait).as(Boolean.class).orElseThrow())) {
201+
return Output.builder()
202+
.jobFlowId(jobFlowId)
203+
.build();
204+
}
205+
206+
final AtomicReference<DescribeClusterResponse> responseReference = new AtomicReference<>();
207+
try {
208+
Await.until(
209+
() -> {
210+
responseReference.set(emrClient.describeCluster(r -> r.clusterId(jobFlowId)));
211+
ClusterState clusterState = responseReference.get().cluster().status().state();
212+
213+
return ClusterState.TERMINATED.equals(clusterState) ||
214+
ClusterState.TERMINATED_WITH_ERRORS.equals(clusterState) ||
215+
ClusterState.WAITING.equals(clusterState);
216+
},
217+
runContext.render(this.completionCheckInterval).as(Duration.class).orElseThrow(),
218+
runContext.render(this.waitUntilCompletion).as(Duration.class).orElseThrow()
219+
);
220+
} catch (TimeoutException e) {
221+
throw new RuntimeException(responseReference.get().cluster().status().stateAsString());
222+
}
223+
224+
runContext.logger().info("Cluster terminated with status : {}", responseReference.get().cluster().status().stateAsString());
225+
226+
return Output.builder()
227+
.jobFlowId(jobFlowId)
228+
.build();
229+
}
230+
}
231+
232+
@Builder
233+
@Getter
234+
static class Output implements io.kestra.core.models.tasks.Output {
235+
@Schema(title = "Job flow ID.")
236+
private String jobFlowId;
237+
}
238+
}

0 commit comments

Comments
 (0)