Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.junit.Test;

public class DataprocAcceptanceTestBase {

private static final long TIMEOUT_IN_SECONDS = 180;
protected static final ClusterProperty DISABLE_CONSCRYPT =
ClusterProperty.of("dataproc:dataproc.conscrypt.provider.enable", "false", "nc");
protected static final ImmutableList<ClusterProperty> DISABLE_CONSCRYPT_LIST =
Expand Down Expand Up @@ -184,8 +184,7 @@ public void testRead() throws Exception {
testName,
"read_shakespeare.py",
null,
Arrays.asList(context.getResultsDirUri(testName)),
120);
Arrays.asList(context.getResultsDirUri(testName)));
assertThat(result.getStatus().getState()).isEqualTo(JobStatus.State.DONE);
String output = AcceptanceTestUtils.getCsv(context.getResultsDirUri(testName));
assertThat(output.trim()).isEqualTo("spark,10");
Expand Down Expand Up @@ -213,8 +212,7 @@ public void writeStream() throws Exception {
context.testBaseGcsDir + "/" + testName + "/json/",
context.bqDataset,
context.bqStreamTable,
AcceptanceTestUtils.BUCKET),
120);
AcceptanceTestUtils.BUCKET));

assertThat(result.getStatus().getState()).isEqualTo(JobStatus.State.DONE);
int numOfRows = getNumOfRowsOfBqTable(context.bqDataset, context.bqStreamTable);
Expand Down Expand Up @@ -245,17 +243,15 @@ public void testBigNumeric() throws Exception {
testName,
"big_numeric.py",
zipFileUri,
Arrays.asList(tableName, context.getResultsDirUri(testName)),
120);
Arrays.asList(tableName, context.getResultsDirUri(testName)));

assertThat(result.getStatus().getState()).isEqualTo(JobStatus.State.DONE);
String output = AcceptanceTestUtils.getCsv(context.getResultsDirUri(testName));
assertThat(output.trim()).isEqualTo(MIN_BIG_NUMERIC + "," + MAX_BIG_NUMERIC);
}

private Job createAndRunPythonJob(
String testName, String pythonFile, String pythonZipUri, List<String> args, long duration)
throws Exception {
String testName, String pythonFile, String pythonZipUri, List<String> args) throws Exception {
AcceptanceTestUtils.uploadToGcs(
getClass().getResourceAsStream("/acceptance/" + pythonFile),
context.getScriptUri(testName),
Expand All @@ -267,7 +263,7 @@ private Job createAndRunPythonJob(
.setPysparkJob(createPySparkJobBuilder(testName, pythonZipUri, args))
.build();

return runAndWait(job, Duration.ofSeconds(duration));
return runAndWait(job, Duration.ofSeconds(TIMEOUT_IN_SECONDS));
}

private PySparkJob.Builder createPySparkJobBuilder(
Expand Down
Loading