Skip to content

Commit 6484a47

Browse files
authored
Fix TestPipeline.runWithAdditionalOptionArgs ignore additionalArgs (#25937)
* Fix TestPipeline.runWithAdditionalOptionArgs ignore additionalArgs when system property PROPERTY_BEAM_TEST_PIPELINE_OPTIONS is empty
1 parent 14a7782 commit 6484a47

File tree

2 files changed

+37
-12
lines changed

2 files changed

+37
-12
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java

+11-12
Original file line numberDiff line numberDiff line change
@@ -347,29 +347,28 @@ public PipelineResult runWithAdditionalOptionArgs(List<String> additionalArgs) {
347347
try {
348348
@Nullable
349349
String beamTestPipelineOptions = System.getProperty(PROPERTY_BEAM_TEST_PIPELINE_OPTIONS);
350-
PipelineOptions options;
351-
if (Strings.isNullOrEmpty(beamTestPipelineOptions)) {
352-
options = PipelineOptionsFactory.create();
353-
} else {
354-
List<String> args = MAPPER.readValue(beamTestPipelineOptions, List.class);
355-
args.addAll(additionalArgs);
356-
String[] newArgs = new String[args.size()];
357-
newArgs = args.toArray(newArgs);
358-
options = PipelineOptionsFactory.fromArgs(newArgs).as(TestPipelineOptions.class);
350+
List<String> args = new ArrayList<>();
351+
if (!Strings.isNullOrEmpty(beamTestPipelineOptions)) {
352+
args.addAll(MAPPER.readValue(beamTestPipelineOptions, List.class));
359353
}
354+
args.addAll(additionalArgs);
355+
String[] newArgs = new String[args.size()];
356+
newArgs = args.toArray(newArgs);
357+
PipelineOptions newOptions =
358+
PipelineOptionsFactory.fromArgs(newArgs).as(TestPipelineOptions.class);
360359

361360
// If no options were specified, set some reasonable defaults
362361
if (Strings.isNullOrEmpty(beamTestPipelineOptions)) {
363362
// If there are no provided options, check to see if a dummy runner should be used.
364363
String useDefaultDummy = System.getProperty(PROPERTY_USE_DEFAULT_DUMMY_RUNNER);
365364
if (!Strings.isNullOrEmpty(useDefaultDummy) && Boolean.valueOf(useDefaultDummy)) {
366-
options.setRunner(CrashingRunner.class);
365+
newOptions.setRunner(CrashingRunner.class);
367366
}
368367
}
369-
options.setStableUniqueNames(CheckEnabled.ERROR);
368+
newOptions.setStableUniqueNames(CheckEnabled.ERROR);
370369

371370
FileSystems.setDefaultPipelineOptions(options);
372-
return run(options);
371+
return run(newOptions);
373372
} catch (IOException e) {
374373
throw new RuntimeException(
375374
"Unable to instantiate test options from system property "

sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java

+26
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import com.fasterxml.jackson.databind.ObjectMapper;
2727
import java.io.Serializable;
28+
import java.util.ArrayList;
2829
import java.util.Collections;
2930
import java.util.Date;
3031
import java.util.List;
@@ -37,9 +38,11 @@
3738
import org.apache.beam.sdk.options.ValueProvider;
3839
import org.apache.beam.sdk.transforms.Create;
3940
import org.apache.beam.sdk.transforms.MapElements;
41+
import org.apache.beam.sdk.transforms.PTransform;
4042
import org.apache.beam.sdk.transforms.SimpleFunction;
4143
import org.apache.beam.sdk.util.common.ReflectHelpers;
4244
import org.apache.beam.sdk.values.PCollection;
45+
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
4346
import org.checkerframework.checker.nullness.qual.Nullable;
4447
import org.hamcrest.BaseMatcher;
4548
import org.hamcrest.Description;
@@ -270,6 +273,29 @@ public void testDanglingPAssertValidatesRunner() throws Exception {
270273
PAssert.that(pCollection).containsInAnyOrder(WHATEVER);
271274
}
272275

276+
@Category(NeedsRunner.class)
277+
@Test
278+
public void testRunWithAdditionalArgsEffective() {
279+
ArrayList<String> pipelineArgs = new ArrayList<String>();
280+
pipelineArgs.add("--tempLocation=gs://some-location/");
281+
pipeline.apply(Create.of("")).apply(new ValidateTempLocation<>());
282+
PipelineResult.State result =
283+
pipeline.runWithAdditionalOptionArgs(pipelineArgs).waitUntilFinish();
284+
assert (result == PipelineResult.State.DONE);
285+
}
286+
287+
static class ValidateTempLocation<T> extends PTransform<PCollection<T>, PCollection<T>> {
288+
@Override
289+
public void validate(PipelineOptions pipelineOptions) {
290+
assert (!Strings.isNullOrEmpty(pipelineOptions.getTempLocation()));
291+
}
292+
293+
@Override
294+
public PCollection<T> expand(PCollection<T> input) {
295+
return input;
296+
}
297+
}
298+
273299
/**
274300
* Tests that a {@link TestPipeline} rule behaves as expected when there is no pipeline usage
275301
* within a test that has a {@link ValidatesRunner} annotation.

0 commit comments

Comments
 (0)