Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ $RECYCLE.BIN/

# End of https://www.toptal.com/developers/gitignore/api/windows,macos,linux,visualstudiocode,python
test.env
test.env.remote
requirements-remote.txt
dbt-test-artifacts/
remote-test-results/
docs_build/site
.claude/*
!.claude/skills/
5 changes: 5 additions & 0 deletions test.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ FABRIC_TEST_HOST=your-endpoint.datawarehouse.fabric.microsoft.com
DBT_TEST_USER_1=dbo
DBT_TEST_USER_2=dbo
DBT_TEST_USER_3=dbo
# Remote Spark test execution (optional, use with pytest --remote --de)
#FABRIC_TEST_SPARK_EXEC_MODE= # "remote" or "mounted"; empty = local (default)
#FABRIC_TEST_ONELAKE_PATH= # required for "mounted" mode (local mount path)
#FABRIC_TEST_SPARK_JOB_NAME=dbt-fabric-tests # Spark Job Definition display name
#FABRIC_TEST_REMOTE_LAKEHOUSE_ID=00000000-0000-0000-0000-000000000000
65 changes: 62 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
from pathlib import Path

import py
import pytest
import yaml

Expand Down Expand Up @@ -97,6 +98,12 @@ def pytest_addoption(parser):
parser.addoption(
"--dw", action="store_true", default=False, help="run only Fabric T-SQL tests"
)
parser.addoption(
"--remote",
action="store_true",
default=False,
help="Run FabricSpark tests as a remote Spark job on Fabric infrastructure",
)


def pytest_configure(config):
Expand Down Expand Up @@ -191,6 +198,53 @@ def pytest_collection_modifyitems(config, items):
)


def _on_fabric_project_base() -> Path | None:
mode = os.getenv("FABRIC_TEST_SPARK_EXEC_MODE", "").lower()
if mode == "remote":
return Path("/lakehouse/default/Files/dbt-test-artifacts")
elif mode == "mounted":
path = os.getenv("FABRIC_TEST_ONELAKE_PATH")
if not path:
raise ValueError(
"FABRIC_TEST_ONELAKE_PATH required when FABRIC_TEST_SPARK_EXEC_MODE=mounted"
)
return Path(path) / "dbt-test-artifacts"
return None


def pytest_runtestloop(session):
if not session.config.getoption("--remote", default=False):
return None
if not session.config.getoption("--de", default=False):
pytest.exit("--remote requires --de (FabricSpark tests only)", returncode=4)
from tests.spark_remote.conftest_plugin import remote_runtestloop

return remote_runtestloop(session)


@pytest.fixture(scope="class")
def project_root(tmpdir_factory, prefix):
base = _on_fabric_project_base()
if base is None:
project_root = tmpdir_factory.mktemp("project")
print(f"\n=== Test project_root: {project_root}")
return project_root
path = base / prefix / "project"
path.mkdir(parents=True, exist_ok=True)
print(f"\n=== Test project_root: {path}")
return py.path.local(path)


@pytest.fixture(scope="class")
def profiles_root(tmpdir_factory, prefix):
base = _on_fabric_project_base()
if base is None:
return tmpdir_factory.mktemp("profile")
path = base / prefix / "profile"
path.mkdir(parents=True, exist_ok=True)
return py.path.local(path)


@pytest.fixture(scope="session", autouse=True)
def livy_session_lifecycle():
session_name = os.getenv("FABRIC_TEST_LIVY_SESSION_NAME")
Expand Down Expand Up @@ -229,10 +283,15 @@ def livy_session_lifecycle():

@pytest.fixture(scope="class")
def logs_dir(request, prefix):
dbt_log_dir = os.path.join(request.config.rootdir, "logs", prefix)
base = _on_fabric_project_base()
if base is not None:
dbt_log_dir = str(base / prefix / "logs")
else:
dbt_log_dir = os.path.join(request.config.rootdir, "logs", prefix)
os.makedirs(dbt_log_dir, exist_ok=True)
print(f"\n=== Test logs_dir: {dbt_log_dir}\n")
os.environ["DBT_LOG_PATH"] = str(dbt_log_dir)
yield str(Path(dbt_log_dir))
os.environ["DBT_LOG_PATH"] = dbt_log_dir
yield dbt_log_dir
del os.environ["DBT_LOG_PATH"]


Expand Down
Empty file added tests/spark_remote/__init__.py
Empty file.
59 changes: 59 additions & 0 deletions tests/spark_remote/conftest_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

import pytest

from tests.spark_remote.orchestrator import RemoteTestOrchestrator
from tests.spark_remote.result_reporter import report_remote_results


def remote_runtestloop(session: pytest.Session) -> bool:
if session.config.option.collectonly:
return True

if not session.items:
return True

remote_args = _build_remote_args(session)

print("\nRemote execution: syncing project to lakehouse...")
orchestrator = RemoteTestOrchestrator.from_env()
orchestrator.sync_project()

print("\nRemote execution: submitting Spark job...")
job_result = orchestrator.run_spark_job(remote_args)

print(f"\n {job_result.status} — downloading results...")
results_path = orchestrator.download_results()
report_remote_results(session, results_path, job_result)

return True


def _build_remote_args(session: pytest.Session) -> list[str]:
args: list[str] = []
config = session.config

if config.getoption("-k"):
args.extend(["-k", config.getoption("-k")])

if config.getoption("verbose", 0) > 0:
args.append("-" + "v" * config.getoption("verbose"))

if config.getoption("--de", default=False):
args.append("--de")

if config.getoption("--dw", default=False):
args.append("--dw")

if config.getoption("--with-grants", default=False):
args.append("--with-grants")

if config.getoption("--with-python", default=False):
args.append("--with-python")

Comment on lines +32 to +53
if config.getoption("-x", default=False):
args.append("-x")

args.append("--junitxml=/lakehouse/default/Files/dbt-test-artifacts/results.xml")

return args
Comment on lines +54 to +59
109 changes: 109 additions & 0 deletions tests/spark_remote/orchestrator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from __future__ import annotations

import os
import sys
from pathlib import Path

from tests.spark_remote.spark_job_client import SparkJobClient, SparkJobResult
from tests.spark_remote.sync import ProjectSync, check_prerequisites


class RemoteTestOrchestrator:
def __init__(
self,
workspace_id: str,
workspace_name: str,
lakehouse_name: str,
lakehouse_id: str,
project_root: Path,
job_name: str = "dbt-fabric-tests",
):
self._workspace_id = workspace_id
self._workspace_name = workspace_name
self._lakehouse_name = lakehouse_name
self._lakehouse_id = lakehouse_id
self._project_root = project_root
self._job_name = job_name

self._sync = ProjectSync(workspace_name, lakehouse_name, project_root)
self._job_client = SparkJobClient(workspace_id, self._get_token)
self._local_results_dir = project_root / "remote-test-results"

@classmethod
def from_env(cls) -> RemoteTestOrchestrator:
errors = check_prerequisites()
if errors:
for error in errors:
print(f"ERROR: {error}", file=sys.stderr)
raise SystemExit(1)

workspace_id = os.environ.get("FABRIC_TEST_WORKSPACE_ID", "")
workspace_name = os.environ.get("FABRIC_TEST_WORKSPACE_NAME", "")
lakehouse_name = os.environ.get("FABRIC_TEST_LAKEHOUSE_NAME", "")
lakehouse_id = os.environ.get("FABRIC_TEST_REMOTE_LAKEHOUSE_ID", "")
job_name = os.environ.get("FABRIC_TEST_SPARK_JOB_NAME", "dbt-fabric-tests")

missing = []
if not workspace_id:
missing.append("FABRIC_TEST_WORKSPACE_ID")
if not workspace_name:
missing.append("FABRIC_TEST_WORKSPACE_NAME")
if not lakehouse_name:
missing.append("FABRIC_TEST_LAKEHOUSE_NAME")
if not lakehouse_id:
missing.append("FABRIC_TEST_REMOTE_LAKEHOUSE_ID")
if missing:
raise ValueError(f"Required env vars not set: {', '.join(missing)}")

project_root = Path(__file__).resolve().parent.parent.parent
return cls(
workspace_id=workspace_id,
workspace_name=workspace_name,
lakehouse_name=lakehouse_name,
lakehouse_id=lakehouse_id,
project_root=project_root,
job_name=job_name,
)
Comment on lines +40 to +66

def sync_project(self) -> None:
self._sync.upload()

def run_spark_job(self, pytest_args: list[str]) -> SparkJobResult:
existing = self._job_client.find_by_name(self._job_name)
if existing:
item_id = existing["id"]
print(f" Reusing Spark Job Definition: {self._job_name} ({item_id})")
else:
executable_path = (
f"abfss://{self._workspace_id}@onelake.dfs.fabric.microsoft.com"
f"/{self._lakehouse_id}/Files/dbt-fabric-tests"
f"/tests/spark_remote/spark_entry_point.py"
)
item_id = self._job_client.create_spark_job_definition(
name=self._job_name,
lakehouse_id=self._lakehouse_id,
executable_path=executable_path,
)
print(f" Created Spark Job Definition: {self._job_name} ({item_id})")

print(f" Remote pytest args: {' '.join(pytest_args)}")
item_id, job_instance_id = self._job_client.run_on_demand(item_id, pytest_args)

job_url = (
f"https://app.fabric.microsoft.com/groups/{self._workspace_id}"
f"/sparkJobDefinitions/{item_id}/runs/{job_instance_id}"
)
print(f" Job URL: {job_url}")
print("\nWaiting for Spark job...")

return self._job_client.poll_until_done(item_id, job_instance_id)

def download_results(self) -> Path | None:
return self._sync.download_artifacts(self._local_results_dir)

def _get_token(self) -> str:
from azure.identity import AzureCliCredential

credential = AzureCliCredential()
token = credential.get_token("https://analysis.windows.net/powerbi/api/.default")
return token.token
Loading