Skip to content

Commit 1e05b59

Browse files
authored
feat: replace arguments by commands (#576)
1 parent ac2e18b commit 1e05b59

File tree

5 files changed

+50
-26
lines changed

5 files changed

+50
-26
lines changed

src/main/java/io/kestra/plugin/aws/emr/AddJobFlowsSteps.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,8 @@
4949
- name: Spark_job_test
5050
jar: "command-runner.jar"
5151
actionOnFailure: CONTINUE
52-
arguments:
53-
- "spark-submit"
54-
- "s3://my-bucket/health_violations.py"
55-
- "--data_source"
56-
- "s3://my-bucket/food_establishment_data.csv"
57-
- "--output_uri"
58-
- "s3://my-bucket/test-emr-output"
52+
commands:
53+
- spark-submit s3://mybucket/health_violations.py --data_source s3://mybucket/food_establishment_data.csv --output_uri s3://mybucket/test-emr-output
5954
"""
6055
)
6156
}

src/main/java/io/kestra/plugin/aws/emr/CreateCluster.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,8 @@
5757
- name: Spark_job_test
5858
jar: "command-runner.jar"
5959
actionOnFailure: CONTINUE
60-
arguments:
61-
- "spark-submit"
62-
- "s3://my-bucket/health_violations.py"
63-
- "--data_source"
64-
- "s3://my-bucket/food_establishment_data.csv"
65-
- "--output_uri"
66-
- "s3://my-bucket/test-emr-output"
60+
commands:
61+
- spark-submit s3://mybucket/health_violations.py --data_source s3://mybucket/food_establishment_data.csv --output_uri s3://mybucket/test-emr-output
6762
wait: true
6863
"""
6964
)

src/main/java/io/kestra/plugin/aws/emr/models/StepConfig.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.kestra.plugin.aws.emr.models;
22

3+
import com.google.common.annotations.VisibleForTesting;
34
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
45
import io.kestra.core.models.property.Property;
56
import io.kestra.core.runners.RunContext;
@@ -10,6 +11,8 @@
1011
import lombok.Getter;
1112
import lombok.extern.jackson.Jacksonized;
1213

14+
import java.util.ArrayList;
15+
import java.util.Arrays;
1316
import java.util.List;
1417

1518
import static io.kestra.core.utils.Rethrow.throwConsumer;
@@ -26,8 +29,8 @@ public class StepConfig {
2629
@Schema(title = "Main class.", description = "The name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file.")
2730
private Property<String> mainClass;
2831

29-
@Schema(title = "Arguments." , description = "A list of command line arguments passed to the JAR file's main function when executed.")
30-
private Property<List<String>> arguments;
32+
@Schema(title = "Commands." , description = "A list of commands that will be passed to the JAR file's main function when executed.")
33+
private Property<List<String>> commands;
3134

3235
@Schema(title = "Step configuration name.", description = "Ex: \"Run Spark job\"")
3336
@NotNull
@@ -44,11 +47,21 @@ public software.amazon.awssdk.services.emr.model.StepConfig toStep(RunContext ru
4447
.hadoopJarStep(throwConsumer(hadoopJarStepBuilder ->
4548
hadoopJarStepBuilder.jar(runContext.render(this.jar).as(String.class).orElseThrow())
4649
.mainClass(runContext.render(this.mainClass).as(String.class).orElse(null))
47-
.args(runContext.render(this.arguments).asList(String.class))
50+
.args(commandToAwsArguments(runContext.render(this.commands).asList(String.class)))
4851
.build()))
4952
.build();
5053
}
5154

55+
@VisibleForTesting
56+
static List<String> commandToAwsArguments(List<String> commands) {
57+
return commands.isEmpty() ? null : commands.stream()
58+
.map(command -> Arrays.stream(command.split(" ")).toList())
59+
.reduce(new ArrayList<>(), (acc, command) -> {
60+
acc.addAll(command);
61+
return acc;
62+
});
63+
}
64+
5265
public enum Action {
5366
TERMINATE_CLUSTER,
5467
CANCEL_AND_WAIT,

src/test/java/io/kestra/plugin/aws/emr/EmrIntegrationTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,15 +84,9 @@ void addStepsToCluster() throws Exception {
8484
private StepConfig createPythonSparkJob() {
8585
return StepConfig.builder()
8686
.jar(Property.of("command-runner.jar"))
87-
.arguments(Property.of(
88-
List.of(
89-
"spark-submit",
90-
"s3://" + bucketName + "/health_violations.py",
91-
"--data_source",
92-
"s3://" + bucketName + "/food_establishment_data.csv",
93-
"--output_uri",
94-
"s3://" + bucketName + "/test-emr-output"
95-
)
87+
.commands(Property.of(
88+
List.of("spark-submit s3://" + bucketName + "/health_violations.py --data_source s3://"
89+
+ bucketName + "/food_establishment_data.csv --output_uri s3://" + bucketName + "/test-emr-output")
9690
))
9791
.name(Property.of("TEST SPARK JOB UNIT TEST"))
9892
.actionOnFailure(Property.of(StepConfig.Action.CONTINUE))
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.kestra.plugin.aws.emr.models;
2+
3+
import org.junit.jupiter.api.Test;
4+
5+
import java.util.List;
6+
7+
import static io.kestra.plugin.aws.emr.models.StepConfig.commandToAwsArguments;
8+
import static org.hamcrest.MatcherAssert.assertThat;
9+
import static org.hamcrest.Matchers.is;
10+
11+
class StepConfigTest {
12+
@Test
13+
void createArgumentsFromCommandList() {
14+
var commands = List.of("spark-submit s3://mybucket/health_violations.py --data_source s3://mybucket/food_establishment_data.csv --output_uri s3://mybucket/test-emr-output");
15+
16+
var expected = List.of(
17+
"spark-submit",
18+
"s3://mybucket/health_violations.py",
19+
"--data_source",
20+
"s3://mybucket/food_establishment_data.csv",
21+
"--output_uri",
22+
"s3://mybucket/test-emr-output"
23+
);
24+
25+
assertThat(commandToAwsArguments(commands), is(expected));
26+
}
27+
}

0 commit comments

Comments
 (0)