Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/comparison-dbt-fabricspark.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ Notable differences:

| Feature | dbt-fabric-samdebruyn | microsoft/dbt-fabricspark |
|---|---|---|
| **[High-concurrency Livy](lakehouse.md#high-concurrency-livy)** | Yes (HC-only, instance-based lifecycle) | Yes (default on, `atexit` cleanup) |
| **Session creation** | `FabricApiClient` singleton | `LivySessionManager` with static globals |
| **Session reuse** | By session name | Via `session_id_file` + `reuse_session` flag |
| **Session reuse** | Deterministic session tag (HC) | Via `session_id_file` + `reuse_session` flag (singleton) / deterministic session tag (HC) |
| **HC session cleanup** | Connection manager `close()` path | `atexit` handler (fragile — see [Code quality](#code-quality)) |
| **Polling interval** | Fixed 3 seconds | Adaptive (configurable) |
| **Session idle timeout** | 15 min default | 30 min default, configurable |
| **Local Livy mode** | No | Yes (`livy_mode: local`) |
Expand Down Expand Up @@ -190,7 +192,7 @@ This adapter uses proper instance-based encapsulation: `FabricTokenProvider` (pe

### atexit handler for session cleanup

The upstream registers an `atexit` handler at module import time (`livysession.py` lines 1314-1322) to delete Livy sessions on process exit. This is fragile: `atexit` handlers run in undefined order, logging/network may already be torn down, and merely importing the module registers the handler even if no session was created.
The upstream registers `atexit` handlers at module import time (in both `singleton_livy.py` and `concurrent_livy.py`) to delete Livy sessions and HC sessions on process exit. This is fragile: `atexit` handlers run in undefined order, logging/network may already be torn down, and merely importing the module registers the handler even if no session was created. The HC implementation adds a second `atexit` handler with a global `_active_sessions` set, compounding the global mutable state problem.

This adapter manages session lifecycle through dbt's normal connection manager `close()` path.

Expand Down
65 changes: 46 additions & 19 deletions docs/lakehouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,37 +45,36 @@ The FabricSpark adapter does not use the [`host`](configuration.md#host) option

## How it works

The FabricSpark adapter executes all SQL through Fabric Livy sessions. Here is the execution flow:
The FabricSpark adapter executes all SQL through Fabric's [high-concurrency Livy API](https://learn.microsoft.com/en-us/fabric/data-engineering/high-concurrency-livy?WT.mc_id=MVP_310840). Each dbt thread gets its own REPL inside a shared underlying Livy session. Here is the execution flow:

```mermaid
sequenceDiagram
participant dbt
participant Adapter
participant Livy API
participant HC Livy API
participant Spark Session

dbt->>Adapter: Compiled Spark SQL
Adapter->>Livy API: GET /sessions (find existing session)
alt Session exists
Livy API-->>Adapter: Session ID
else No session
Adapter->>Livy API: POST /sessions (create new)
Livy API-->>Adapter: Session ID
Note over Adapter,Spark Session: Session startup: 1-5 minutes
Adapter->>HC Livy API: POST /highConcurrencySessions (acquire REPL)
alt Underlying session exists (warm)
HC Livy API-->>Adapter: HC session ID + REPL ID
else No underlying session
Note over HC Livy API,Spark Session: Spark startup
HC Livy API-->>Adapter: HC session ID + REPL ID
end
Adapter->>Livy API: POST /sessions/{id}/statements
Livy API->>Spark Session: Execute Spark SQL
Adapter->>HC Livy API: POST /highConcurrencySessions/{id}/repls/{replId}/statements
HC Livy API->>Spark Session: Execute Spark SQL (in REPL)
loop Poll every 3 seconds
Adapter->>Livy API: GET /statements/{id}
Livy API-->>Adapter: Status + results (when done)
Adapter->>HC Livy API: GET /highConcurrencySessions/{id}/repls/{replId}/statements/{stmtId}
HC Livy API-->>Adapter: Status + results (when done)
end
Adapter-->>dbt: Parsed results
```

Key technical details:

- **Session reuse** -- All statements in a dbt run share the same Livy session (named `dbt-fabric-samdebruyn` by default). This avoids the overhead of creating a new Spark session for each model.
- **Session TTL** -- Sessions are created with a TTL of 30 seconds. If the session is idle for longer than that after the dbt run finishes, Fabric will automatically clean it up.
- **One REPL per thread** -- Each dbt thread acquires its own REPL inside a shared underlying Livy session. Statements from different REPLs execute in parallel.
- **Deterministic session tag** -- The adapter computes a session tag from `(workspace_id, lakehouse_id)`. Fabric packs all REPLs with the same tag onto one underlying Livy session, enabling warm session reuse across dbt invocations.
- **Polling interval** -- The adapter polls for statement completion every 3 seconds.
- **Rate limiting** -- The Fabric Livy API enforces rate limits. The adapter handles HTTP 429 responses automatically using the `Retry-After` header.
- **DB-API 2.0 cursor** -- Results are returned as JSON and parsed into a [PEP 249](https://peps.python.org/pep-0249/) compatible cursor, so dbt interacts with the Lakehouse the same way it interacts with any other database.
Expand Down Expand Up @@ -125,17 +124,45 @@ SELECT [my column] FROM [my_schema].[my_table]

---

## High-concurrency Livy

The adapter uses Fabric's [high-concurrency Livy API](https://learn.microsoft.com/en-us/fabric/data-engineering/high-concurrency-livy?WT.mc_id=MVP_310840). Each dbt thread acquires its own HC session -- and therefore its own REPL -- inside a single underlying Livy session shared via a deterministic `sessionTag` derived from `(workspace_id, lakehouse_id)`. Statements from different REPLs execute in **parallel** inside the same Spark application, so increasing `threads` in your profile directly increases throughput.

### Session reuse across runs

The session tag is deterministic: every dbt invocation targeting the same workspace + lakehouse produces the same tag. Fabric snap-attaches new REPLs onto the still-warm underlying Livy session, skipping the Spark cold-start entirely on subsequent runs.

### `threads > 5`

Fabric packs up to **5 REPLs onto one underlying Livy session** (see the [HC Livy key concepts](https://learn.microsoft.com/en-us/fabric/data-engineering/high-concurrency-livy?WT.mc_id=MVP_310840#key-concepts)). With `threads > 5`, dbt still works correctly -- Fabric spins up a second underlying Livy session to host the 6th REPL onwards.

| Property | Shared across underlying sessions? |
| --- | --- |
| OneLake Delta tables (dbt model outputs) | Yes -- same lakehouse storage |
| Catalog / metastore (`SELECT FROM <other_model>`) | Yes -- same Fabric catalog |
| Temp views (`CREATE TEMPORARY VIEW ...`) | No -- REPL/session-local |
| Session-level Spark configs (`SET spark.sql.X = ...`) | No |
| Cached datasets / UDFs / broadcast vars | No |

Because dbt-fabricspark materializations always write permanent Delta / lake view objects, model-to-model `ref`s resolve correctly regardless of which underlying session produced or consumes the table.

!!! note "Cost tradeoff"

Each additional underlying Livy session is a separate Spark cluster billed for the duration of the run plus the idle timeout. Keep `threads ≤ 5` for the cheapest profile; raise it only when the extra parallelism beats the extra compute spend.

---

## Performance considerations

The Livy API architecture has inherent performance characteristics that are important to understand.

### Session startup

Creating a new Spark session can take **1-5 minutes**. The adapter reuses sessions within a run, so this overhead is paid once per `dbt run`. Subsequent runs may reuse an existing session if it is still alive.
Creating a new Spark session takes some time. The adapter reuses sessions within a run, so this overhead is paid once per `dbt run`. Subsequent runs may reuse an existing session if it is still alive. The [high-concurrency Livy](#high-concurrency-livy) session tag is deterministic, so subsequent runs can skip startup entirely by reattaching to a warm session.

### Statement execution

Each SQL statement involves multiple HTTP API calls (submit + poll). This is inherently slower than a direct database connection like the TDS protocol used by the Data Warehouse adapter.
Each SQL statement involves multiple HTTP API calls (submit + poll). This is inherently slower than a direct database connection like the TDS protocol used by the Data Warehouse adapter. Statements from different threads execute in parallel via [high-concurrency Livy](#high-concurrency-livy), significantly improving wall-clock time for multi-model runs.

### Polling overhead

Expand All @@ -147,7 +174,7 @@ Fabric applies rate limits to the Livy API. The adapter handles HTTP 429 respons

### Practical impact

A dbt run with many models will be significantly slower on FabricSpark than on Fabric Data Warehouse. This is inherent to the Livy API architecture, not a limitation of the adapter.
A dbt run with many models will be significantly slower on FabricSpark than on Fabric Data Warehouse. This is inherent to the Livy API architecture, not a limitation of the adapter. [High-concurrency Livy](#high-concurrency-livy) reduces this gap by running statements in parallel.

### Recommendations

Expand Down Expand Up @@ -194,7 +221,7 @@ See the [Python models guide](python-models.md) for writing and debugging Python
- **No Spark SQL views** -- only tables and materialized lake views (Fabric lake views) are supported.
- **No incremental merge strategy** -- the Spark SQL `MERGE` syntax in Fabric Lakehouse is not supported by the adapter. Use `append` or `insert_overwrite` instead.
- **API rate limiting** -- can slow down large runs with many models.
- **Session startup time** -- 1-5 minutes for the first statement in a run.
- **Session startup time** -- creating a new Spark session adds latency to the first statement in a run.
- **Data Warehouse-only features** -- [CLUSTER BY](cluster-by.md), [warehouse snapshots](warehouse-snapshots.md), and [catalog statistics](catalog-stats.md) are not available for Lakehouse.

---
Expand Down
2 changes: 1 addition & 1 deletion src/dbt/adapters/fabric/base_fabric_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dbt.adapters.contracts.connection import AdapterResponse
from dbt.adapters.events.logging import AdapterLogger
from dbt.adapters.fabric.fabric_livy_helper import FabricLivyHelper
from dbt.adapters.fabric.fabric_livy_session import LivySubmissionResult
from dbt.adapters.fabric.livy_result import LivySubmissionResult
from dbt.adapters.fabric.purview_sync import PurviewSync, extract_syncable_models
from dbt.adapters.sql.impl import SQLAdapter

Expand Down
162 changes: 58 additions & 104 deletions src/dbt/adapters/fabric/fabric_api_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import threading
import time
import urllib.parse
from typing import Any, Self
Expand All @@ -12,8 +11,6 @@

logger = logging.getLogger(__name__)

_livy_session_thread_lock = threading.Lock()


class FabricApiError(dbt_common.exceptions.DbtRuntimeError):
def __init__(self, method: str, url: str, status_code: int, response_text: str) -> None:
Expand All @@ -39,7 +36,6 @@ def __init__(
self._workspace_id: str | None = None
self._cached_warehouses: list[dict] | None = None
self._cached_lakehouses: list[dict] | None = None
self._livy_session_id: str | None = None
self._warehouse_snapshot_operations: dict[str, str] = {}

@classmethod
Expand Down Expand Up @@ -418,123 +414,81 @@ def get_livy_base_api_uri(self) -> str:
f"/lakehouses/{lakehouse_id}/livyapi/versions/{self._LIVY_API_VERSION}"
)

def get_existing_livy_session(self) -> str | None:
"""Find an active Livy session matching the configured name, or return None."""
url = self.get_livy_base_api_uri() + "/sessions"
response = self._api_get(url)
sessions = response.json().get("items", [])
for session in sessions:
if session["name"] == self._credentials.livy_session_name and session["livyState"] in (
"idle",
"starting",
"running",
"busy",
):
return session["id"]
return None

def initialize_livy_session(self) -> str:
"""Create a new Livy session and wait briefly for it to start."""
url = self.get_livy_base_api_uri() + "/sessions"
body = {"name": self._credentials.livy_session_name, "ttl": "30s"}

max_attempts = 3
backoff_seconds = 5
last_exception: Exception | None = None

for attempt in range(1, max_attempts + 1):
try:
response = self._api_post(url, body)
time.sleep(10)
return response.json()["id"]
except FabricApiError as e:
is_transient = e.status_code == 404 or 500 <= e.status_code < 600

if not is_transient or attempt == max_attempts:
raise

last_exception = e
wait_time = backoff_seconds * (2 ** (attempt - 1))
logger.warning(
f"Livy session creation returned a transient error "
f"(attempt {attempt}/{max_attempts}), retrying in {wait_time}s: {e}"
)
time.sleep(wait_time)
def acquire_hc_session(self, session_tag: str) -> dict[str, Any]:
"""POST /highConcurrencySessions to acquire an HC session (= one REPL).

assert last_exception is not None
raise last_exception

def get_livy_session_id(self) -> str:
"""Return the active Livy session ID, reusing an existing session or creating one.
Args:
session_tag: Deterministic tag so Fabric packs all REPLs from
the same process onto one underlying Livy session.

Thread-safe: uses a lock to prevent multiple sessions from being created
concurrently when dbt runs with multiple threads.
Returns:
The JSON response body containing at least ``id`` and ``state``.
"""
if self._livy_session_id is None:
with _livy_session_thread_lock:
self._livy_session_id = (
self.get_existing_livy_session() or self.initialize_livy_session()
)
return self._livy_session_id

def get_livy_session_base_uri(self) -> str:
"""Build the API URI for the current Livy session."""
return self.get_livy_base_api_uri() + f"/sessions/{self.get_livy_session_id()}"

def get_livy_session_state(self) -> str:
"""Query the current state of the Livy session (idle, busy, starting, etc.)."""
response = self._api_get(self.get_livy_session_base_uri())
return response.json().get("state", "unknown")
url = self.get_livy_base_api_uri() + "/highConcurrencySessions"
body: dict[str, Any] = {
"sessionTag": session_tag,
"name": self._credentials.livy_session_name,
}
response = self._api_post(url, body)
return response.json()

def get_livy_statement(self, statement_id: int) -> dict[str, Any]:
"""Fetch the current status and output of a Livy statement.
def get_hc_session(self, hc_id: str) -> dict[str, Any]:
"""Poll the state of an HC session.

Args:
statement_id: The statement ID returned by a submit call.
Returns:
JSON with ``state``, and when idle also ``sessionId`` and ``replId``.
"""
url = self.get_livy_session_base_uri() + f"/statements/{statement_id}"
url = self.get_livy_base_api_uri() + f"/highConcurrencySessions/{hc_id}"
response = self._api_get(url)
return response.json()

def submit_livy_python_statement(self, code: str) -> int:
"""Submit Python code to the Livy session and return the statement ID.
def submit_hc_sql_statement(self, livy_session_id: str, repl_id: str, code: str) -> int:
"""Submit a SQL statement via an HC REPL. Returns the statement ID."""
url = (
self.get_livy_base_api_uri()
+ f"/highConcurrencySessions/{livy_session_id}"
+ f"/repls/{repl_id}/statements"
)
response = self._api_post(url, {"code": code, "kind": "sql"})
return response.json()["id"]

Args:
code: The Python/PySpark code to execute.
"""
url = self.get_livy_session_base_uri() + "/statements"
def submit_hc_python_statement(self, livy_session_id: str, repl_id: str, code: str) -> int:
"""Submit a Python statement via an HC REPL. Returns the statement ID."""
url = (
self.get_livy_base_api_uri()
+ f"/highConcurrencySessions/{livy_session_id}"
+ f"/repls/{repl_id}/statements"
)
response = self._api_post(url, {"code": code, "kind": "pyspark"})
return response.json()["id"]

def submit_livy_sql_statement(self, code: str) -> int:
"""Submit SQL code to the Livy session and return the statement ID.
def get_hc_statement(
self, livy_session_id: str, repl_id: str, statement_id: int
) -> dict[str, Any]:
"""Fetch the status and output of an HC REPL statement."""
url = (
self.get_livy_base_api_uri()
+ f"/highConcurrencySessions/{livy_session_id}"
+ f"/repls/{repl_id}/statements/{statement_id}"
)
response = self._api_get(url)
return response.json()

Args:
code: The Spark SQL code to execute.
"""
url = self.get_livy_session_base_uri() + "/statements"
response = self._api_post(url, {"code": code, "kind": "sql"})
return response.json()["id"]
def cancel_hc_statement(self, livy_session_id: str, repl_id: str, statement_id: int) -> str:
"""Cancel a running HC REPL statement."""
url = (
self.get_livy_base_api_uri()
+ f"/highConcurrencySessions/{livy_session_id}"
+ f"/repls/{repl_id}/statements/{statement_id}/cancel"
)
response = self._api_post(url, {})
return response.json()["msg"]

def delete_livy_session(self) -> None:
"""Delete the current Livy session and clear the cached session ID."""
if self._livy_session_id is None:
return
session_id = self._livy_session_id
url = self.get_livy_base_api_uri() + f"/sessions/{session_id}"
def delete_hc_session(self, hc_id: str) -> None:
"""Release an HC session (REPL slot). Best-effort; ignores 404."""
url = self.get_livy_base_api_uri() + f"/highConcurrencySessions/{hc_id}"
try:
self._api_delete(url)
except FabricApiError as e:
if e.status_code != 404:
raise
self._livy_session_id = None

def cancel_livy_statement(self, statement_id: int) -> str:
"""Cancel a running Livy statement.

Args:
statement_id: The statement ID to cancel.
"""
url = self.get_livy_session_base_uri() + f"/statements/{statement_id}/cancel"
response = self._api_post(url, {})
return response.json()["msg"]
Loading