Add remote Spark test orchestration framework#211
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d8fd23fe1b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if not workspace_id and not workspace_name: | ||
| raise ValueError("FABRIC_TEST_WORKSPACE_ID or FABRIC_TEST_WORKSPACE_NAME must be set") |
There was a problem hiding this comment.
Require both workspace name and ID for orchestration
from_env() currently accepts either FABRIC_TEST_WORKSPACE_ID or FABRIC_TEST_WORKSPACE_NAME, but downstream code needs both: ProjectSync builds OneLake URLs from workspace_name while SparkJobClient and job URLs use workspace_id. If users set only one variable (which this validation allows), remote execution fails later with malformed sync or API paths instead of a clear configuration error.
Useful? React with 👍 / 👎.
| path = ( | ||
| f"/v1/workspaces/{self._workspace_id}/items/{item_id}/jobs/instances?jobType=sparkjob" | ||
| ) | ||
| body = {"executionData": {"commandLineArguments": " ".join(command_line_args)}} |
There was a problem hiding this comment.
Preserve quoting when forwarding remote pytest args
Building commandLineArguments with ' '.join(command_line_args) drops shell quoting, so arguments that contain spaces (for example -k "foo and bar") are split into multiple tokens in the remote job and change test selection semantics. This makes remote runs execute the wrong tests for common -k expressions and can produce mismatched local vs remote results.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Pull request overview
Adds a pytest-native --remote execution path intended to run FabricSpark integration tests as a Fabric Spark Job Definition, syncing the repo to OneLake and replaying remote junitxml results back into the local pytest session.
Changes:
- Introduces
pytest_runtestloopinterception intests/conftest.pyplus remote/mounted artifact path redirection forproject_root,profiles_root, andlogs_dir. - Adds a new
tests/spark_remote/framework: OneLake sync (azcopy), Fabric Spark Job Definition client (REST), Spark entry point, and junitxml result mapping back to pytest reports. - Updates
.gitignoreandtest.env.samplefor generated remote-run files and configuration.
Reviewed changes
Copilot reviewed 8 out of 10 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
tests/spark_remote/sync.py |
Implements azcopy-based project upload and artifact download, plus prerequisite checks. |
tests/spark_remote/spark_job_client.py |
Fabric REST wrapper to list/create/run/poll Spark Job Definitions. |
tests/spark_remote/spark_entry_point.py |
Spark-side runner that installs deps and executes pytest, writing junitxml artifacts. |
tests/spark_remote/result_reporter.py |
Parses junitxml and emits pytest TestReports to mirror remote results locally. |
tests/spark_remote/orchestrator.py |
Coordinates prerequisite checks, sync, Spark job submission, and artifact download. |
tests/spark_remote/conftest_plugin.py |
Implements the remote pytest_runtestloop handler and argument forwarding. |
tests/spark_remote/__init__.py |
Initializes the new tests.spark_remote package. |
tests/conftest.py |
Registers --remote, adds runtestloop delegation, and redirects artifact/log paths for remote/mounted modes. |
test.env.sample |
Documents optional env vars for remote/mounted Spark test execution. |
.gitignore |
Ignores generated remote files and artifact/result directories. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| result = subprocess.run(["az", "account", "show"], capture_output=True, text=True) | ||
| if result.returncode != 0: | ||
| errors.append("Azure CLI not logged in. Run: az login") | ||
|
|
| workspace_id = os.environ.get("FABRIC_TEST_WORKSPACE_ID", "") | ||
| workspace_name = os.environ.get("FABRIC_TEST_WORKSPACE_NAME", "") | ||
| lakehouse_name = os.environ.get("FABRIC_TEST_LAKEHOUSE_NAME", "") | ||
| lakehouse_id = os.environ.get("FABRIC_TEST_REMOTE_LAKEHOUSE_ID", "") | ||
| job_name = os.environ.get("FABRIC_TEST_SPARK_JOB_NAME", "dbt-fabric-tests") | ||
|
|
||
| if not workspace_id and not workspace_name: | ||
| raise ValueError("FABRIC_TEST_WORKSPACE_ID or FABRIC_TEST_WORKSPACE_NAME must be set") | ||
| if not lakehouse_name: | ||
| raise ValueError("FABRIC_TEST_LAKEHOUSE_NAME must be set") | ||
|
|
||
| project_root = Path(__file__).resolve().parent.parent.parent | ||
| return cls( | ||
| workspace_id=workspace_id, | ||
| workspace_name=workspace_name, | ||
| lakehouse_name=lakehouse_name, | ||
| lakehouse_id=lakehouse_id, | ||
| project_root=project_root, | ||
| job_name=job_name, | ||
| ) |
| def list_spark_job_definitions(self) -> list[dict]: | ||
| items = [] | ||
| path = f"/v1/workspaces/{self._workspace_id}/sparkJobDefinitions" | ||
| while path: | ||
| resp = self._get(path) | ||
| data = resp.json() | ||
| items.extend(data.get("value", [])) | ||
| path = data.get("continuationUri") | ||
| return items |
| def run_on_demand(self, item_id: str, command_line_args: list[str]) -> tuple[str, str]: | ||
| path = ( | ||
| f"/v1/workspaces/{self._workspace_id}/items/{item_id}/jobs/instances?jobType=sparkjob" | ||
| ) | ||
| body = {"executionData": {"commandLineArguments": " ".join(command_line_args)}} | ||
| resp = self._post(path, json_data=body) | ||
|
|
||
| location = resp.headers.get("Location", "") | ||
| # Location header format: .../items/{item_id}/jobs/instances/{job_instance_id} | ||
| parts = location.rstrip("/").split("/") | ||
| job_instance_id = parts[-1] if parts else "" | ||
| return item_id, job_instance_id |
| _FORWARDED_OPTIONS = ( | ||
| "-k", | ||
| "-v", | ||
| "--verbose", | ||
| "-s", | ||
| "--capture", | ||
| "--with-grants", | ||
| "--with-python", | ||
| "--de", | ||
| "--dw", | ||
| "-x", | ||
| "--exitfirst", | ||
| ) |
| def _build_remote_args(session: pytest.Session) -> list[str]: | ||
| args: list[str] = [] | ||
| config = session.config | ||
|
|
||
| if config.getoption("-k"): | ||
| args.extend(["-k", config.getoption("-k")]) | ||
|
|
||
| if config.getoption("verbose", 0) > 0: | ||
| args.append("-" + "v" * config.getoption("verbose")) | ||
|
|
||
| if config.getoption("--de", default=False): | ||
| args.append("--de") | ||
|
|
||
| if config.getoption("--dw", default=False): | ||
| args.append("--dw") | ||
|
|
||
| if config.getoption("--with-grants", default=False): | ||
| args.append("--with-grants") | ||
|
|
||
| if config.getoption("--with-python", default=False): | ||
| args.append("--with-python") | ||
|
|
| if config.getoption("-x", default=False): | ||
| args.append("-x") | ||
|
|
||
| args.append("--junitxml=/lakehouse/default/Files/dbt-test-artifacts/results.xml") | ||
|
|
||
| return args |
| LAKEHOUSE_ROOT = "/lakehouse/default" | ||
| PROJECT_DIR = f"{LAKEHOUSE_ROOT}/Files/dbt-fabric-tests" | ||
| ARTIFACTS_DIR = f"{LAKEHOUSE_ROOT}/Files/dbt-test-artifacts" | ||
|
|
||
|
|
||
| def main() -> None: | ||
| os.makedirs(ARTIFACTS_DIR, exist_ok=True) | ||
|
|
||
| requirements_file = f"{PROJECT_DIR}/requirements-remote.txt" | ||
| subprocess.check_call( | ||
| [sys.executable, "-m", "pip", "install", "-r", requirements_file, "--quiet"], | ||
| ) | ||
|
|
||
| subprocess.check_call( | ||
| [sys.executable, "-m", "pip", "install", "-e", PROJECT_DIR, "--no-deps", "--quiet"], | ||
| ) | ||
|
|
||
| env_file = f"{PROJECT_DIR}/test.env.remote" | ||
| if os.path.exists(env_file): | ||
| from dotenv import load_dotenv | ||
|
|
||
| load_dotenv(env_file, override=True) | ||
|
|
||
| os.environ["FABRIC_TEST_SPARK_EXEC_MODE"] = "remote" | ||
|
|
||
| pytest_args = sys.argv[1:] | ||
|
|
||
| has_junitxml = any(arg.startswith("--junitxml") for arg in pytest_args) | ||
| if not has_junitxml: | ||
| pytest_args.extend(["--junitxml", f"{ARTIFACTS_DIR}/results.xml"]) | ||
|
|
| elif mode == "mounted": | ||
| path = os.getenv("FABRIC_TEST_ONELAKE_PATH") | ||
| if not path: | ||
| raise ValueError("FABRIC_TEST_ONELAKE_PATH required when SPARK_EXEC_MODE=mounted") |
| if file_attr: | ||
| file_module = file_attr.replace("/", ".").removesuffix(".py") | ||
| remaining = classname.removeprefix(file_module + ".") | ||
| if remaining and remaining != classname: | ||
| return f"{file_attr}::{remaining.replace('.', '::')}::{name}" | ||
| return f"{file_attr}::{name}" |
Introduces a pytest-native `--remote` flag that transparently delegates FabricSpark test execution to a Spark Job Definition on Fabric. The developer runs `pytest --de --remote -k "TestFoo"` and gets normal pytest output — the plugin handles sync, job submission, polling, and result reporting via junitxml. Also adds Mode B (mounted lakehouse) support via FABRIC_TEST_SPARK_EXEC_MODE env var, which redirects test artifacts to OneLake-accessible paths. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
d8fd23f to
165b00c
Compare
- Require all env vars (workspace_id, workspace_name, lakehouse_id) - Use shlex.join for proper arg quoting in Spark job submission - Handle FileNotFoundError when checking for az CLI - Support absolute continuationUri URLs in pagination - Remove unused _FORWARDED_OPTIONS constant - Fix error message to reference correct env var name - Add unit tests for junitxml parsing and nodeid reconstruction Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
sdebruyn
left a comment
There was a problem hiding this comment.
Addressed all review comments in commit 2d0af9c:
Fixed (7 items):
- Require both workspace ID and name (Codex P1 + Copilot) — Validation now requires all four env vars with a clear error listing missing ones.
- shlex.join for arg quoting (Codex P2 + Copilot) —
run_on_demand()now usesshlex.join()to preserve quoting for-kexpressions with spaces. - FileNotFoundError for az CLI (Copilot) —
check_prerequisites()now wrapssubprocess.runin try/except and returns a clear install message. - Pagination absolute URLs (Copilot) —
_get()and_post()now detect absolute URLs and skip prependingFABRIC_API_BASE. - Unused
_FORWARDED_OPTIONS(Copilot) — Removed the dead constant. - Error message typo (Copilot) — Now correctly says
FABRIC_TEST_SPARK_EXEC_MODE=mounted. - Unit tests for nodeid reconstruction (Copilot) — Added 11 tests in
tests/unit/test_result_reporter.py.
Not changing (3 items):
- Limited arg forwarding — Intentional. Remote Spark execution has different constraints; the explicit allowlist ensures only tested, compatible options are forwarded.
- Hardcoded junitxml path (2 comments) — This is single-user test infrastructure. The azcopy sync with
--delete-destination=trueensures clean state before each run. Per-run UUIDs would complicate download/cleanup without benefit.
Summary
--remotepytest flag that transparently delegates FabricSpark test execution to a Spark Job Definition on Fabric infrastructurepytest --de --remote -k "TestFoo"— no separate CLI neededHow it works
pytest_runtestloophook intercepts when--remoteis set--junitxmlNew files
tests/spark_remote/conftest_plugin.py— pytest_runtestloop hook implementationtests/spark_remote/result_reporter.py— junitxml → TestReport mappingtests/spark_remote/orchestrator.py— coordinates sync + job clienttests/spark_remote/spark_job_client.py— Fabric REST API wrappertests/spark_remote/sync.py— azcopy sync wrappertests/spark_remote/spark_entry_point.py— runs inside Spark jobTest plan
pytest --destill works without--remote(no regression)--remotewithout--deexits with errorFABRIC_TEST_SPARK_EXEC_MODE=mountedCloses #207
🤖 Generated with Claude Code