Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#28187] Add gradle targets to execute python tests with prism. #32637

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/runners/portability/prism_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,24 @@ def test_expand_kafka_write(self):
def test_sql(self):
raise unittest.SkipTest("Requires an expansion service to execute.")

# The following tests require additional implementation in Prism.

def test_custom_merging_window(self):
raise unittest.SkipTest(
"Requires Prism to support Custom Window " +
"Coders, and Merging Custom Windows. " +
"https://github.com/apache/beam/issues/31921")

def test_custom_window_type(self):
raise unittest.SkipTest(
"Requires Prism to support Custom Window Coders." +
" https://github.com/apache/beam/issues/31921")

def test_pack_combiners(self):
raise unittest.SkipTest(
"Requires Prism to support coder:" +
" 'beam:coder:tuple:v1'. https://github.com/apache/beam/issues/32636")


# Inherits all other tests.

Expand Down
1 change: 1 addition & 0 deletions sdks/python/pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ markers =
sickbay_direct: run without sickbay-direct
sickbay_spark: run without sickbay-spark
sickbay_flink: run without sickbay-flink
sickbay_prism: run without sickbay-prism
sickbay_dataflow: run without sickbay-dataflow
# Tests using this marker conflict with the xdist plugin in some way, such
# as enabling save_main_session.
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/test-suites/gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,10 @@ samza_validates_runner_postcommit_py_versions=3.9,3.12
# spark runner test-suites
spark_examples_postcommit_py_versions=3.9,3.12

# prism runner test-suites
prism_validates_runner_precommit_py_versions=3.12
prism_validates_runner_postcommit_py_versions=3.9,3.12
prism_examples_postcommit_py_versions=3.9,3.12

# cross language postcommit python test suites
cross_language_validates_py_versions=3.9,3.12
25 changes: 25 additions & 0 deletions sdks/python/test-suites/portable/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,31 @@ tasks.register("samzaValidatesRunner") {
}
}

tasks.register("prismValidatesRunner") {
getVersionsAsList('prism_validates_runner_postcommit_py_versions').each {
dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismValidatesRunner")
}
}

tasks.register("flinkValidatesRunnerPrecommit") {
getVersionsAsList('flink_validates_runner_precommit_py_versions').each {
dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:flinkValidatesRunner")
}
}

tasks.register("prismValidatesRunnerPrecommit") {
getVersionsAsList('prism_validates_runner_precommit_py_versions').each {
dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismValidatesRunner")
}
}

// TODO merge with above once passing. Currently for convenience.
tasks.register("prismTriggerTranscript") {
getVersionsAsList('prism_validates_runner_precommit_py_versions').each {
dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismTriggerTranscript")
}
}

tasks.register("flinkExamplesPostCommit") {
getVersionsAsList('flink_examples_postcommit_py_versions').each {
dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:flinkExamples")
Expand All @@ -48,3 +67,9 @@ tasks.register("sparkExamplesPostCommit") {
dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:sparkExamples")
}
}

tasks.register("prismExamplesPostCommit") {
getVersionsAsList('prism_examples_postcommit_py_versions').each {
dependsOn.add(":sdks:python:test-suites:portable:py${getVersionSuffix(it)}:prismExamples")
}
}
81 changes: 81 additions & 0 deletions sdks/python/test-suites/portable/common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,56 @@ tasks.register("sparkValidatesRunner") {
dependsOn 'sparkCompatibilityMatrixLOOPBACK'
}

def createPrismRunnerTestTask(String workerType) {
def taskName = "prismCompatibilityMatrix${workerType}"

def prismBin = "${rootDir}/runners/prism/build/tmp/prism"
def options = "--prism_bin=${prismBin} --environment_type=${workerType}"
if (workerType == 'PROCESS') {
options += " --environment_options=process_command=${buildDir.absolutePath}/sdk_worker.sh"
}
def task = toxTask(taskName, 'prism-runner-test', options)
task.configure {
dependsOn ":runners:prism:build"
// The Java SDK worker is required to execute external transforms.
def suffix = getSupportedJavaVersion()
dependsOn ":sdks:java:container:${suffix}:docker"
if (workerType == 'DOCKER') {
dependsOn pythonContainerTask
} else if (workerType == 'PROCESS') {
dependsOn createProcessWorker
}
}
return task
}

createPrismRunnerTestTask('DOCKER')
createPrismRunnerTestTask('PROCESS')
createPrismRunnerTestTask('LOOPBACK')

tasks.register("prismValidatesRunner") {
dependsOn 'prismCompatibilityMatrixLOOPBACK'
}

tasks.register("prismTriggerTranscript") {
dependsOn 'setupVirtualenv'
dependsOn ':runners:prism:build'
def prismBin = "${rootDir}/runners/prism/build/tmp/prism"
doLast {
exec {
executable 'sh'
args '-c', """
. ${envdir}/bin/activate \\
&& cd ${pythonRootDir} \\
&& pip install -e .[test] \\
&& pytest \\
apache_beam/transforms/trigger_test.py::WeakTestStreamTranscriptTest \\
--test-pipeline-options='--runner=PrismRunner --environment_type=LOOPBACK --prism_location=${prismBin}'
"""
}
}
}

project.tasks.register("preCommitPy${pythonVersionSuffix}") {
dependsOn = [":sdks:python:container:py${pythonVersionSuffix}:docker",
":runners:flink:${latestFlinkVersion}:job-server:shadowJar",
Expand Down Expand Up @@ -283,6 +333,37 @@ project.tasks.register("sparkExamples") {
}
}

project.tasks.register("prismExamples") {
dependsOn = [
'setupVirtualenv',
'installGcpTest',
':runners:prism:build',
]
def prismBin = "${rootDir}/runners/prism/build/tmp/prism"
doLast {
def testOpts = [
"--log-cli-level=INFO",
]
def pipelineOpts = [
"--runner=PrismRunner",
"--project=apache-beam-testing",
"--environment_type=LOOPBACK",
"--temp_location=gs://temp-storage-for-end-to-end-tests/temp-it",
"--prism_location=${prismBin}",
]
def cmdArgs = mapToArgString([
"test_opts": testOpts,
"suite": "postCommitExamples-prism-py${pythonVersionSuffix}",
"pipeline_opts": pipelineOpts.join(" "),
"collect": "examples_postcommit and not sickbay_prism"
])
exec {
executable 'sh'
args '-c', ". ${envdir}/bin/activate && ${pythonRootDir}/scripts/run_integration_test.sh $cmdArgs"
}
}
}

project.tasks.register("postCommitPy${pythonVersionSuffix}IT") {
dependsOn = [
'setupVirtualenv',
Expand Down
4 changes: 4 additions & 0 deletions sdks/python/tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,10 @@ extras = test
commands =
bash {toxinidir}/scripts/pytest_validates_runner.sh {envname} {toxinidir}/apache_beam/runners/portability/spark_runner_test.py {posargs}

[testenv:prism-runner-test]
extras = test
commands =
bash {toxinidir}/scripts/pytest_validates_runner.sh {envname} {toxinidir}/apache_beam/runners/portability/prism_runner_test.py {posargs}

[testenv:py{39,310}-pyarrow-{3,9,10,11,12,13,14,15,16}]
deps =
Expand Down
Loading