Skip to content
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ $RECYCLE.BIN/

# End of https://www.toptal.com/developers/gitignore/api/windows,macos,linux,visualstudiocode,python
test.env
test.env.remote
requirements-remote.txt
dbt-test-artifacts/
remote-test-results/
docs_build/site
.claude/*
!.claude/skills/
Expand Down
27 changes: 27 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,33 @@ uv run pytest --with-grants # Include GRANT/authorization tests
uv run pytest -k "TestClassName" # A specific test class
```

### Remote Spark test execution

FabricSpark tests normally run via a local Livy session. The `--remote` flag delegates test execution to a Spark Job Definition on Fabric infrastructure instead. Locally, pytest collects tests as usual, but instead of executing them it syncs the project to a lakehouse via azcopy and submits a Spark job that runs the tests remotely. Results are downloaded as junitxml and reported back in the local pytest session.

```shell
uv run pytest --de --remote -k "TestClassName" # Run a specific test remotely
uv run pytest --de --remote # Run all DE tests remotely
```

`--remote` requires `--de` — it only applies to FabricSpark tests.

#### Prerequisites

1. [azcopy](https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azcopy-v10?WT.mc_id=MVP_310840) installed and on your `PATH`
2. Azure credentials (`az login` or service principal configured in `test.env`)
3. A Spark Job Definition in the Fabric workspace (the name is configurable via `FABRIC_TEST_SPARK_JOB_NAME`, default: `dbt-fabric-tests`)

#### Optional environment variables

| Variable | Description |
|---|---|
| `FABRIC_TEST_SPARK_EXEC_MODE` | `remote` or `mounted` (default: local execution when omitted) |
| `FABRIC_TEST_ONELAKE_PATH` | Local mount path for OneLake (required for `mounted` mode) |
| `FABRIC_TEST_SPARK_JOB_NAME` | Spark Job Definition display name (default: `dbt-fabric-tests`) |

The remote orchestrator reuses `FABRIC_TEST_WORKSPACE_NAME` and `FABRIC_TEST_LAKEHOUSE_NAME` from `test.env` and resolves their IDs via the Fabric API — no separate ID variables are needed.

### Test architecture

Tests use [dbt-tests-adapter](https://github.com/dbt-labs/dbt-adapters), dbt's official adapter test harness. It provides base test classes for standard adapter behavior. Our tests inherit from these and override fixtures where Fabric's SQL dialect differs:
Expand Down
29 changes: 25 additions & 4 deletions src/dbt/adapters/fabric/fabric_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,39 @@ def _api_request(
raise FabricApiError(method, url, response.status_code, response.text)
return response

def _api_get(self, url: str) -> requests.Response:
@property
def base_url(self) -> str:
"""The Fabric REST API base URL from credentials."""
return self._credentials.fabric_base_api_uri

def api_get(self, url: str) -> requests.Response:
"""Send an authenticated GET request with 429 retry."""
return self._api_request(url, method="get")

def _api_post(self, url: str, body: dict) -> requests.Response:
def api_post(self, url: str, body: dict) -> requests.Response:
"""Send an authenticated POST request with 429 retry."""
return self._api_request(url, method="post", body=body)

def _api_patch(self, url: str, body: dict) -> requests.Response:
def api_patch(self, url: str, body: dict) -> requests.Response:
"""Send an authenticated PATCH request with 429 retry."""
return self._api_request(url, method="patch", body=body)

def _api_delete(self, url: str) -> requests.Response:
def api_delete(self, url: str) -> requests.Response:
"""Send an authenticated DELETE request with 429 retry."""
return self._api_request(url, method="delete")

def _api_get(self, url: str) -> requests.Response:
return self.api_get(url)

def _api_post(self, url: str, body: dict) -> requests.Response:
return self.api_post(url, body)

def _api_patch(self, url: str, body: dict) -> requests.Response:
return self.api_patch(url, body)

def _api_delete(self, url: str) -> requests.Response:
return self.api_delete(url)

def get_workspace_id(self) -> str:
"""Resolve the Fabric workspace ID from config or by looking up the workspace name.

Expand Down
5 changes: 5 additions & 0 deletions test.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,8 @@ FABRIC_TEST_HOST=your-endpoint.datawarehouse.fabric.microsoft.com
DBT_TEST_USER_1=dbo
DBT_TEST_USER_2=dbo
DBT_TEST_USER_3=dbo
# Remote Spark test execution (optional, use with pytest --remote --de)
# Uses FABRIC_TEST_WORKSPACE_NAME/ID and FABRIC_TEST_LAKEHOUSE_NAME from above
#FABRIC_TEST_SPARK_EXEC_MODE= # "remote" or "mounted"; empty = local (default)
#FABRIC_TEST_ONELAKE_PATH= # required for "mounted" mode (local mount path)
#FABRIC_TEST_SPARK_JOB_NAME=dbt-fabric-tests # Spark Job Definition display name
114 changes: 108 additions & 6 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
import yaml
from py.path import local as LocalPath

from dbt.adapters.fabric.fabric_api_client import FabricApiClient
from dbt.adapters.fabric.fabric_credentials import FabricCredentials
Expand Down Expand Up @@ -102,6 +103,12 @@ def pytest_addoption(parser):
parser.addoption(
"--dw", action="store_true", default=False, help="run only Fabric T-SQL tests"
)
parser.addoption(
"--remote",
action="store_true",
default=False,
help="Run FabricSpark tests as a remote Spark job on Fabric infrastructure",
)


def pytest_configure(config):
Expand Down Expand Up @@ -145,14 +152,17 @@ def pytest_ignore_collect(collection_path, config):
if config.getoption("--de", default=False) and top_dir == "fabric":
return True

if _requires_spark(collection_path, tests_root) and not _spark_extra_available(): # noqa: SIM102
if config.getoption("--de", default=False):
if _requires_spark(collection_path, tests_root) and not _spark_extra_available():
if config.getoption("--remote", default=False):
pass
elif config.getoption("--de", default=False):
pytest.exit(
"The spark extra is required for FabricSpark tests. "
"Install with: uv sync --extra spark",
returncode=4,
)
return True
else:
return True

return None

Expand Down Expand Up @@ -193,12 +203,104 @@ def pytest_collection_modifyitems(config, items):
)


def _on_fabric_project_base() -> Path | None:
mode = os.getenv("FABRIC_TEST_SPARK_EXEC_MODE", "").lower()
if mode == "remote":
return Path("/lakehouse/default/Files/dbt-test-artifacts")
elif mode == "mounted":
path = os.getenv("FABRIC_TEST_ONELAKE_PATH")
if not path:
raise ValueError(
"FABRIC_TEST_ONELAKE_PATH required when FABRIC_TEST_SPARK_EXEC_MODE=mounted"
)
return Path(path) / "dbt-test-artifacts"
return None


def pytest_runtestloop(session):
if not session.config.getoption("--remote", default=False):
return None
if not session.config.getoption("--de", default=False):
pytest.exit("--remote requires --de (FabricSpark tests only)", returncode=4)
from tests.spark_remote.conftest_plugin import remote_runtestloop

return remote_runtestloop(session)


@pytest.fixture(scope="class")
def project_root(tmpdir_factory, prefix):
base = _on_fabric_project_base()
if base is None:
project_root = tmpdir_factory.mktemp("project")
print(f"\n=== Test project_root: {project_root}")
return project_root
path = base / prefix / "project"
path.mkdir(parents=True, exist_ok=True)
print(f"\n=== Test project_root: {path}")
return LocalPath(path)


@pytest.fixture(scope="class")
def profiles_root(tmpdir_factory, prefix):
base = _on_fabric_project_base()
if base is None:
return tmpdir_factory.mktemp("profile")
path = base / prefix / "profile"
path.mkdir(parents=True, exist_ok=True)
return LocalPath(path)


@pytest.fixture(scope="session", autouse=True)
def livy_session_lifecycle(request):
if request.config.getoption("--remote", default=False):
yield
return

session_name = os.getenv("FABRIC_TEST_LIVY_SESSION_NAME")
lakehouse_name = os.getenv("FABRIC_TEST_LAKEHOUSE_NAME")
workspace_name = os.getenv("FABRIC_TEST_WORKSPACE_NAME")
workspace_id = os.getenv("FABRIC_TEST_WORKSPACE_ID")

if not session_name or not lakehouse_name or not (workspace_name or workspace_id):
yield
return

from dbt.adapters.fabric.fabric_livy_session import LivySession

creds = FabricCredentials(
database=lakehouse_name,
schema="dbo",
lakehouse=lakehouse_name,
workspace_name=workspace_name,
workspace_id=workspace_id,
livy_session_name=session_name,
**_auth_kwargs_from_env(),
)
token_provider = FabricTokenProvider(creds)
client = FabricApiClient(creds, token_provider)

client.get_livy_session_id()
LivySession(client).wait_for_session_ready()

yield

try:
client.delete_livy_session()
except Exception as e:
print(f"\nWarning: failed to delete Livy session: {e}")


@pytest.fixture(scope="class")
def logs_dir(request, prefix):
dbt_log_dir = os.path.join(request.config.rootdir, "logs", prefix)
base = _on_fabric_project_base()
if base is not None:
dbt_log_dir = str(base / prefix / "logs")
else:
dbt_log_dir = os.path.join(request.config.rootdir, "logs", prefix)
os.makedirs(dbt_log_dir, exist_ok=True)
print(f"\n=== Test logs_dir: {dbt_log_dir}\n")
os.environ["DBT_LOG_PATH"] = str(dbt_log_dir)
yield str(Path(dbt_log_dir))
os.environ["DBT_LOG_PATH"] = dbt_log_dir
yield dbt_log_dir
del os.environ["DBT_LOG_PATH"]


Expand Down
Empty file added tests/spark_remote/__init__.py
Empty file.
82 changes: 82 additions & 0 deletions tests/spark_remote/conftest_plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from __future__ import annotations

from uuid import uuid4

import pytest

from tests.spark_remote.orchestrator import RemoteTestOrchestrator
from tests.spark_remote.result_reporter import report_remote_results


def remote_runtestloop(session: pytest.Session) -> bool:
"""Replace pytest's default test loop with remote Spark execution.

Generates a unique run ID per invocation (for artifact isolation) and
derives a stable worktree key from the project root (for incremental
project sync). Multiple agents in separate worktrees can run concurrently
without interference.

Args:
session: The pytest Session (already collected).

Returns:
True to signal that the test loop has been handled.
"""
if session.config.option.collectonly:
return True

if not session.items:
return True

run_id = uuid4().hex[:8]
remote_args = _build_remote_args(session)

orchestrator = RemoteTestOrchestrator.from_env(run_id)
print(
f"\nRemote execution [run={run_id}, worktree={orchestrator._worktree_key}]:"
f" syncing project to lakehouse..."
)
orchestrator.sync_project()

print(f"\nRemote execution [run={run_id}]: submitting Spark job...")
job_result = orchestrator.run_spark_job(remote_args)

print(f"\n {job_result.status} — downloading results...")
results_path = orchestrator.download_results()
report_remote_results(session, results_path, job_result)

return True


def _build_remote_args(session: pytest.Session) -> list[str]:
"""Extract relevant pytest options from the local session for remote forwarding.

Forwards -k, -v, --de, -x, and positional test paths. The --junitxml path
is set by the remote entry point based on the run ID.

Args:
session: The local pytest Session to extract options from.

Returns:
List of CLI arguments to pass to the remote pytest invocation.
"""
args: list[str] = []
config = session.config

if config.getoption("-k"):
args.extend(["-k", config.getoption("-k")])

if config.getoption("verbose", 0) > 0:
args.append("-" + "v" * config.getoption("verbose"))

if config.getoption("--de", default=False):
args.append("--de")
Comment thread
sdebruyn marked this conversation as resolved.

if config.getoption("-x", default=False):
args.append("-x")
Comment thread
sdebruyn marked this conversation as resolved.

Comment thread
sdebruyn marked this conversation as resolved.
positional = config.args
if positional:
args.extend(positional)

return args
Loading