Skip to content

feat: add high-concurrency Livy support for parallel statement execution#186

Merged
mdrakiburrahman merged 2 commits into
mainfrom
dev/mdrrahman/concurrecy
May 17, 2026
Merged

feat: add high-concurrency Livy support for parallel statement execution#186
mdrakiburrahman merged 2 commits into
mainfrom
dev/mdrrahman/concurrecy

Conversation

@mdrakiburrahman

@mdrakiburrahman mdrakiburrahman commented May 17, 2026

Copy link
Copy Markdown
Collaborator

Adds #185

Why this change is needed

dbt-fabricspark today opens a single Livy session per process and submits every model's SQL via POST /sessions/{id}/statements. Even with Spark's FAIR scheduler enabled, every statement lands in the same default scheduling pool → FIFO inside the Spark scheduler → effectively serial execution when one query saturates executor cores. As @cheyney-w showed in #185, this means threads > 1 in dbt yields no real throughput gain for independent models.

Repro from the issue (https://github.com/cheyney-w/dbt-fabricspark-cross-workspace-demo) — 4 independent cross-join models on a fresh schema-enabled lakehouse with threads: 4:

  • Singleton mode (today's behaviour): model durations 149s / 179s / 195s / 210s. Completion order matches submission order — FIFO fingerprint. ~2.8× effective speedup, not 4×.
  • HC mode (this PR), 1st run: 442s total (208s spent on Spark cold-start), completion order randomised.
  • HC mode (this PR), 2nd run (warm Livy via deterministic sessionTag): 122s total — 3.6× speedup over the 1st run, completion order randomised.

The submission-order ≠ completion-order signal proves statements are now executing concurrently inside the Spark application rather than queued.

How

Adds Fabric's high-concurrency Livy API as the default backend, with a small interface refactor so the legacy single-session path stays available as an opt-out.

OOP/SOLID split

  • New src/dbt/adapters/fabricspark/livy_backend.pyLivyBackend ABC defining connect(creds) and disconnect().
  • src/dbt/adapters/fabricspark/singleton_livy.py — existing LivySession / LivyCursor / LivyConnection / LivySessionManager / LivySessionConnectionWrapper moved here verbatim; LivySessionManager now subclasses LivyBackend.
  • src/dbt/adapters/fabricspark/concurrent_livy.py — new HC backend (HighConcurrencySession / HighConcurrencyCursor / HighConcurrencyConnection / HighConcurrencySessionManager / HighConcurrencyConnectionWrapper).
  • src/dbt/adapters/fabricspark/livysession.py — slimmed to module-level helpers (auth, headers, lakehouse-property fetch, file I/O) plus re-exports of the singleton classes for test patches that use the old import path.
  • connections.py picks the backend per credentials, then otherwise unchanged. cleanup_all() now disconnects each per-thread manager before clearing the dict, so HC IDs are released promptly instead of waiting for Fabric's idle reaper.

HC lifecycle (per dbt thread)

POST   /livyapi/.../highConcurrencySessions          { sessionTag, conf, numExecutors, ... }
       → 202 { id: <hc_id>, state: NotStarted }
GET    /livyapi/.../highConcurrencySessions/{hc_id}  (poll)
       → state in {NotStarted, AcquiringHighConcurrencySession} → keep polling
       → state == Idle → { sessionId, replId }
POST   /livyapi/.../highConcurrencySessions/{sessionId}/repls/{replId}/statements   { code, kind: "sql" }
GET    .../statements/{stmtId}                                                       (poll)
       → state == available → return { output: { data: { application/json: { schema, data } } } }
DELETE /livyapi/.../highConcurrencySessions/{hc_id}                                  (on process exit / cleanup_all)

The HC payload uses the exact same output.data.application/json.{schema,data} JSON envelope as singleton-mode for kind: sql (verified end-to-end against MainLakehouse), so the cursor's result-parsing path is identical.

Threading model

  • All dbt threads in one process share one deterministic sessionTag computed at module scope from (workspaceid, lakehouseid) when reuse_session: true, or a per-process uuid when false.
  • Fabric server-side packs HC sessions with the same tag onto one underlying Livy session = one Spark cluster = up to 5 REPLs (per MS Learn). Subsequent dbt invocations re-derive the same tag → Fabric snap-attaches new REPLs onto the still-warm underlying session, eliminating Spark cold-start.
  • A process-level _active_sessions registry plus a dedicated atexit handler DELETEs every acquired HC id on exit so REPL slots free up immediately.
  • Shortcut creation is gated by a process-level _shortcuts_done set keyed by (workspaceid, lakehouseid) so OneLake shortcuts are still created exactly once per process even when N threads each open their own HC session.
  • 404 on a statement submit/poll flags only that thread's HighConcurrencySession as dead (no global state mutation), so the next add_query retry transparently re-acquires just that REPL.

New config

profiles.yml:

high_concurrency: true   # default; set to false to fall back to legacy single-session mode

high_concurrency is silently ignored when livy_mode: local because the HC API is a Fabric-specific construct. Connection wiring is use_hc = creds.high_concurrency and not creds.is_local_mode.

threads > 5 behaviour

Documented in the new README "High-concurrency Livy" section with a table covering what's shared (OneLake tables, catalog) vs not shared (temp views, session configs, UDFs, cached datasets) across multiple underlying Livy sessions when Fabric needs to spin up a second cluster.

Alternatives considered

  • Make singleton Spark scheduling pools (spark.scheduler.pool) per-statement to get FAIR sharing across queries inside one session. Less invasive but the maintainer's preference (per the design discussion thread) was to adopt Fabric's purpose-built HC API rather than tune Spark scheduler internals.
  • Apache Livy session reuse via livy-session-id.txt file (already used in the singleton path) — works for single-process scenarios but doesn't deliver parallel statement execution.

Test

CI suite run locally (npx nx run dbt-fabricspark:test --output-style=stream) — all 10 tasks pass:

[0] Pre-test workspace nuke (WS1 + WS2)                                 |   0:05 | Pass
[1] Provision no_schema lakehouse (WS1)                                 |   0:04 | Pass
[2] Provision with_schema lakehouse (WS1)                               |   0:03 | Pass
[3] Provision with_schema lakehouse (WS2 cross-workspace read source)   |   0:03 | Pass
[4] Seed cross_ws_fixture into WS2 lakehouse                            |   3:26 | Pass
[5] Create Livy session (no_schema)                                     |   3:17 | Pass
[6] Create Livy session (with_schema)                                   |   3:20 | Pass
[7] Functional tests (no_schema)                                        |  16:50 | Pass
[8] Functional tests (with_schema, includes cross-workspace)            |  19:25 | Pass
[9] Post-test workspace nuke (WS1 + WS2)                                |   0:05 | Pass

Unit suite:              233 passed, 11 skipped (61s)
Local-e2e (jaffle-shop): All dbt commands completed successfully
Lint + build:            Pass
image

Note on the functional test profile

tests/functional/conftest.py and tests/functional/fixtures/ws2_seed/profiles.yml opt out of HC (high_concurrency: false) even though the user-facing default is true. Reason — MS Learn explicitly warns:

"The sessionTag is a hint for packing. It isn't a strict lock. Rapid concurrent POST requests with the same sessionTag might create multiple Livy sessions."

pytest-xdist spawns many worker processes that all POST /highConcurrencySessions near-simultaneously. On the first attempt with HC defaulted on for CI I observed 31 underlying Livy sessions per lakehouse trying to start in parallel, overwhelming the F64 capacity. The existing test orchestrator's pre-warm pattern (writes singleton Livy session IDs to livy-session-no_schema.{0..3}.txt for xdist workers to attach to by id) has no HC equivalent because the HC API only accepts a packing hint, not a "use this specific session id" parameter.

Re-enabling HC for the functional suite is feasible follow-up work — would require teaching the orchestrator to pre-warm a single HC session per (lakehouse, sessionTag) before the xdist fleet starts.

New unit tests

tests/unit/test_concurrent_livy.py (17 cases) covers the HC lifecycle with mocked HTTP:

  • derive_session_tag — deterministic when reuse_session: true, cached uuid per process when false, and distinct per lakehouse.
  • HighConcurrencySession.acquire — happy path through NotStartedAcquiringHighConcurrencySessionIdle, terminal Dead raises, 404-then-success retry.
  • HighConcurrencyCursor.execute — SELECT returns rows + schema, DDL returns empty result set, statement error raises, 404 on submit marks REPL stale for re-acquire.
  • HighConcurrencySession.delete clears state and removes from the active-session registry.
  • HighConcurrencySessionManager satisfies the LivyBackend ABC, reuses a healthy session across connect() calls, deletes the HC id on disconnect().
  • HighConcurrencyConnectionWrapper delegates correctly and strips trailing semicolons.

mdrakiburrahman and others added 2 commits May 17, 2026 00:31
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@mdrakiburrahman mdrakiburrahman merged commit 7ed9ec2 into main May 17, 2026
2 checks passed
@mdrakiburrahman mdrakiburrahman deleted the dev/mdrrahman/concurrecy branch May 17, 2026 01:39
sdebruyn added a commit to sdebruyn/dbt-fabric that referenced this pull request May 17, 2026
)

* Add high-concurrency Livy support for parallel statement execution (#231)

Each dbt thread acquires its own REPL inside a shared underlying Livy
session via Fabric's HC Livy API, enabling true parallel execution
instead of FIFO queuing. Default on via `high_concurrency: true`.

Upstream contribution: microsoft/dbt-fabricspark#186

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Remove non-HC singleton path, make high-concurrency Livy the only mode

HC mode passed all targeted integration tests (basic, validate connection,
concurrency). There is no reason to keep the singleton fallback — HC is
strictly better (parallel execution, warm session reuse). This removes the
`high_concurrency` config flag and simplifies all FabricSpark connection
code to use `HighConcurrencyLivySession` exclusively.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Address PR review: fix cursor cancel test, align transient error handling

- Fix unit test asserting old cancel path (cancel_livy_statement →
  cancel_statement)
- Handle requests transport exceptions (ConnectionError, Timeout,
  ChunkedEncodingError, JSONDecodeError) in HC session acquire and poll,
  matching the resilience of the singleton LivySession
- Remove unused _ACQUIRING_STATES constant

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Clean up review findings: rename params, simplify error handling, fix docs

- Rename session_id → livy_session_id in HC API client methods to
  clarify these take the underlying Livy session ID, not the HC session ID
- Merge duplicate TimeoutError/Exception handlers in
  wait_and_get_statement_result (TimeoutError is a subclass of Exception)
- Remove stale "singleton Livy sessions" reference from comparison doc

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Remove specific Spark startup timing from lakehouse docs

The "1-5 minutes" claim was inaccurate — startup can sometimes
take just a few seconds. Replaced with generic phrasing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Switch FabricLivyHelper to HC Livy and remove old LivySession

The Fabric DW adapter's Python model execution now uses
HighConcurrencyLivySession instead of the old LivySession class.
This removes the last consumer of the legacy Livy session API,
so LivySession and all non-HC Livy methods in FabricApiClient
are deleted.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Address PR review comments: thread safety, session cleanup, diagram paths

- FabricLivyHelper: use thread-local storage instead of class-level
  singleton so each thread gets its own HC REPL
- HighConcurrencyLivySession: best-effort delete of HC session when
  _poll_until_idle fails or when re-acquiring after staleness
- Mermaid diagram: update API paths to match actual HC endpoints

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Clarify HC session docstring: close() only frees this REPL slot

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Rename fabric_livy_session.py to livy_result.py, remove dead tests

The file only contains dataclasses (LivySessionResult,
LivySubmissionResult) — the old name was misleading. Also removes
unit tests for the deleted LivySession class and legacy
FabricApiClient session management methods.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Add unit tests for HighConcurrencyLivySession

29 tests covering: session tag derivation, logs URL, acquire with
retry/cleanup, polling (idle/timeout/fatal/transient), ensure-repl
re-acquire, SQL/Python statement dispatch, 404 dead-marking,
statement result parsing, close/cancel, and error resilience.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Force JVM GC after synapsesql write to release JDBC locks

The synapsesql connector keeps JDBC connections to the Data Warehouse
open after df.write completes. These idle connections hold schema-level
locks (LCK_M_SCH_M) that block subsequent DDL in the same schema.

The GC must run as a separate Livy statement (fire-and-forget) after the
model code finishes, because running it in the same statement leaves the
JDBC objects in scope where GC cannot collect them.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Revert "Force JVM GC after synapsesql write to release JDBC locks"

This reverts commit 3444723.

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
sdebruyn added a commit to sdebruyn/dbt-fabric that referenced this pull request May 17, 2026
)

Each dbt thread acquires its own REPL inside a shared underlying Livy
session via Fabric's HC Livy API, enabling true parallel execution
instead of FIFO queuing. Default on via `high_concurrency: true`.

Upstream contribution: microsoft/dbt-fabricspark#186

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] Use Livy high concurrency (HC) to support parallel model execution

1 participant