Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ uv run pytest tests/unit/test_adapter.py::TestSparkAdapter::test_profile_with_da
- **`fail-on-untracked-files` will fail CI if you leave a regenerated file uncommitted** — this catches stale `uv.lock` after a dep change. Always commit lockfile updates that result from your edits.
- **Devcontainer image is pinned by digest** in `.devcontainer/devcontainer.json` and `.github/workflows/ci.yml`. Don't hand-edit these; rebuild via the flow in `.devcontainer/README.md`.

## Comments in code

- Do **NOT** add flowery comments in the code all over the place. The code is self-descriptive, **ONLY** add comments when the code is doing something non-orthodox
- Do **NOT** indirectly refer to Github issues in the actual code. The codebase has nothing to do with Github, it's self-contained.

## Do not assume — ask first

Adapter behavior is subtle (schema-enabled detection, cross-lakehouse routing, session reuse, MLV semantics, Livy retry policy). Before making non-trivial changes:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ on:
pull_request:
workflow_dispatch:

concurrency:
group: ci-${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

env:
IS_GH_ACTION: "1"
CLIENT_ID: ${{ secrets.APP_ID }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@

{% materialization materialized_lake_view, adapter='fabricspark' -%}
{%- set identifier = model['alias'] -%}
{%- set workspace_name = config.get('workspace_name') -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(
identifier=identifier,
schema=schema,
database=database,
type='materialized_view') -%}
type='materialized_view',
workspace=workspace_name) -%}

{#-- Config --#}
{%- set partitioned_by = config.get('partitioned_by', none) -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,33 @@
{% materialization view, adapter='fabricspark' -%}
{#-- Ensure the database/schema exists before creating the view --#}
{% do ensure_database_exists(model.schema, database=model.database, workspace=model.config.get('workspace_name')) %}
{{ return(create_or_replace_view()) }}
{%- set identifier = model['alias'] -%}
{%- set grant_config = config.get('grants') -%}
{%- set workspace_name = config.get('workspace_name') -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}

{%- set target_relation = api.Relation.create(
identifier=identifier, schema=schema, database=database,
type='view', workspace=workspace_name) -%}

{% do ensure_database_exists(schema, database=database, workspace=workspace_name) %}

{{ run_hooks(pre_hooks) }}

{%- if old_relation is not none and old_relation.is_table -%}
{{ fabricspark__handle_existing_table(should_full_refresh(), old_relation) }}
{%- endif -%}

{% call statement('main') -%}
{{ get_create_view_as_sql(target_relation, sql) }}
{%- endcall %}

{% set should_revoke = should_revoke(exists_as_view, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}
{%- endmaterialization %}

{% macro fabricspark__handle_existing_table(full_refresh, old_relation) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=target_relation.schema,
database=target_relation.database,
workspace=target_relation.workspace,
type='view') -%}

{% set select = snapshot_staging_table(strategy, sql, target_relation) %}
Expand Down Expand Up @@ -83,13 +84,18 @@
{%- set unique_key = config.get('unique_key') %}
{%- set file_format = config.get('file_format') or 'delta' -%}
{%- set grant_config = config.get('grants') -%}
{%- set workspace_name = config.get('workspace_name') -%}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}

{%- if workspace_name -%}
{% set target_relation = target_relation.incorporate(workspace=workspace_name) %}
{%- endif -%}

{%- if file_format not in ['delta'] -%}
{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
Expand Down
252 changes: 252 additions & 0 deletions tests/functional/adapter/test_cross_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,255 @@ def test_full_refresh_resets_table(self, project, ws2_workspace_name, ws2_lakeho
"expected 4 rows after cross-workspace --full-refresh "
f"(reset to first-run body), got {n}"
)


# ---------------------------------------------------------------------------
# Positive: cross-workspace WRITE via VIEW materialization
# ---------------------------------------------------------------------------


CROSS_WS_WRITE_VIEW_SQL = """
{{ config(
materialized='view',
workspace_name=var('ws2_workspace_name'),
database=var('ws2_lakehouse_name'),
schema=var('ws2_write_schema')
) }}

select 1 as id, 'alpha' as name, cast(10.5 as double) as price
union all
select 2 as id, 'beta' as name, cast(20.0 as double) as price
union all
select 3 as id, 'gamma' as name, cast(30.25 as double) as price
union all
select 4 as id, 'delta' as name, cast(40.75 as double) as price
"""


class TestCrossWorkspace4PartWriteView:
@pytest.fixture(scope="class", autouse=True)
def _skip_unless_schema_enabled(self, is_schema_enabled):
if not is_schema_enabled:
pytest.skip(
"Cross-workspace 4-part naming is only supported on schema-enabled "
"lakehouses (Fabric Livy limitation)."
)

@pytest.fixture(scope="class")
def project_config_update(self, ws2_workspace_name, ws2_lakehouse_name):
return {
"name": "cross_workspace_4part_write_view",
"vars": {
"ws2_workspace_name": ws2_workspace_name,
"ws2_lakehouse_name": ws2_lakehouse_name,
"ws2_write_schema": _WS2_WRITE_SCHEMA,
},
}

@pytest.fixture(scope="class")
def models(self):
return {
"cross_ws_write_view.sql": CROSS_WS_WRITE_VIEW_SQL,
}

def test_cross_workspace_view_renders_four_part(
self, project, ws2_workspace_name, ws2_lakehouse_name
):
compile_results = run_dbt(["compile", "--select", "cross_ws_write_view"])
assert len(compile_results) == 1
node = compile_results[0].node
rendered_target = node.relation_name or ""
assert f"`{ws2_workspace_name}`" in rendered_target, (
f"expected WS2 name in rendered relation, got: {rendered_target}"
)
assert f"`{ws2_lakehouse_name}`" in rendered_target, (
f"expected WS2 lakehouse in rendered relation, got: {rendered_target}"
)
assert f"`{_WS2_WRITE_SCHEMA}`" in rendered_target, (
f"expected WS2 write schema in rendered relation, got: {rendered_target}"
)

def test_cross_workspace_view_executes(
self,
project,
ws2_workspace_name,
ws2_lakehouse_name,
):
run_results = run_dbt(["run", "--select", "cross_ws_write_view"])
assert len(run_results) == 1, f"expected 1 run result, got {len(run_results)}"

sql = (
"select count(*) as n, sum(price) as total "
f"from `{ws2_workspace_name}`.`{ws2_lakehouse_name}`."
f"`{_WS2_WRITE_SCHEMA}`.cross_ws_write_view"
)
rows = project.run_sql(sql, fetch="all")
assert len(rows) == 1
n, total = rows[0]
assert int(n) == 4, f"expected 4 rows in WS2 cross-workspace view, got {n}"
assert abs(float(total) - (10.5 + 20.0 + 30.25 + 40.75)) < 1e-6, (
f"price sum mismatch in WS2 view: {total}"
)

def test_cross_workspace_view_is_idempotent(
self,
project,
ws2_workspace_name,
ws2_lakehouse_name,
):
run_results = run_dbt(["run", "--select", "cross_ws_write_view"])
assert len(run_results) == 1

sql = (
"select count(*) as n "
f"from `{ws2_workspace_name}`.`{ws2_lakehouse_name}`."
f"`{_WS2_WRITE_SCHEMA}`.cross_ws_write_view"
)
rows = project.run_sql(sql, fetch="all")
assert int(rows[0][0]) == 4, (
"expected the WS2 cross-workspace view to still resolve to 4 rows "
"after an idempotent re-run"
)


# ---------------------------------------------------------------------------
# Positive: cross-workspace WRITE via SNAPSHOT materialization (issue #172)
# ---------------------------------------------------------------------------


CROSS_WS_SNAPSHOT_SOURCE_SQL = """
{{ config(materialized='table', file_format='delta') }}

{% set bump = var('snap_source_bump', 0) %}

select 1 as id, 'alpha' as name, cast(10.5 as double) as price union all
select 2 as id, 'beta' as name, cast(20.0 as double) as price union all
select 3 as id, 'gamma' as name, cast(30.25 as double) as price union all
select 4 as id, 'delta_v{{ bump }}' as name, cast(40.75 as double) as price
"""


CROSS_WS_SNAPSHOT_SQL = """
{% snapshot cross_ws_write_snapshot %}
{{ config(
strategy='check',
unique_key='id',
check_cols=['name'],
file_format='delta',
workspace_name=var('ws2_workspace_name'),
target_database=var('ws2_lakehouse_name'),
target_schema=var('ws2_write_schema')
) }}
select * from {{ ref('cross_ws_snapshot_source') }}
{% endsnapshot %}
"""


class TestCrossWorkspace4PartWriteSnapshot:
@pytest.fixture(scope="class", autouse=True)
def _skip_unless_schema_enabled(self, is_schema_enabled):
if not is_schema_enabled:
pytest.skip(
"Cross-workspace 4-part naming is only supported on schema-enabled "
"lakehouses (Fabric Livy limitation)."
)

@pytest.fixture(scope="class")
def project_config_update(self, ws2_workspace_name, ws2_lakehouse_name):
return {
"name": "cross_workspace_4part_write_snapshot",
"vars": {
"ws2_workspace_name": ws2_workspace_name,
"ws2_lakehouse_name": ws2_lakehouse_name,
"ws2_write_schema": _WS2_WRITE_SCHEMA,
},
}

@pytest.fixture(scope="class")
def models(self):
return {
"cross_ws_snapshot_source.sql": CROSS_WS_SNAPSHOT_SOURCE_SQL,
}

@pytest.fixture(scope="class")
def snapshots(self):
return {
"cross_ws_write_snapshot.sql": CROSS_WS_SNAPSHOT_SQL,
}

def _count_rows(self, project, ws2_workspace_name, ws2_lakehouse_name) -> int:
sql = (
"select count(*) as n "
f"from `{ws2_workspace_name}`.`{ws2_lakehouse_name}`."
f"`{_WS2_WRITE_SCHEMA}`.cross_ws_write_snapshot"
)
rows = project.run_sql(sql, fetch="all")
return int(rows[0][0])

def test_snapshot_renders_four_part(self, project, ws2_workspace_name, ws2_lakehouse_name):
compile_results = run_dbt(["compile", "--select", "cross_ws_write_snapshot"])
snapshot_results = [r for r in compile_results if r.node.name == "cross_ws_write_snapshot"]
assert len(snapshot_results) == 1, (
f"expected exactly 1 snapshot compile result, got {len(snapshot_results)}"
)
node = snapshot_results[0].node
rendered_target = node.relation_name or ""
assert f"`{ws2_workspace_name}`" in rendered_target, (
f"expected WS2 name in rendered relation, got: {rendered_target}"
)
assert f"`{ws2_lakehouse_name}`" in rendered_target, (
f"expected WS2 lakehouse in rendered relation, got: {rendered_target}"
)
assert f"`{_WS2_WRITE_SCHEMA}`" in rendered_target, (
f"expected WS2 write schema in rendered relation, got: {rendered_target}"
)

def test_snapshot_first_run_ctas_into_ws2(
self, project, ws2_workspace_name, ws2_lakehouse_name
):
run_results = run_dbt(["run", "--select", "cross_ws_snapshot_source"])
assert len(run_results) == 1

snap_results = run_dbt(["snapshot", "--select", "cross_ws_write_snapshot"])
assert len(snap_results) == 1

n = self._count_rows(project, ws2_workspace_name, ws2_lakehouse_name)
assert n == 4, f"expected 4 rows after initial cross-workspace snapshot CTAS, got {n}"

def test_snapshot_second_run_merges_into_ws2(
self, project, ws2_workspace_name, ws2_lakehouse_name
):
run_results = run_dbt(
[
"run",
"--select",
"cross_ws_snapshot_source",
"--vars",
"{snap_source_bump: 1}",
]
)
assert len(run_results) == 1

snap_results = run_dbt(["snapshot", "--select", "cross_ws_write_snapshot"])
assert len(snap_results) == 1

n = self._count_rows(project, ws2_workspace_name, ws2_lakehouse_name)
assert n == 5, (
"expected 5 SCD2 rows in WS2 after MERGE INTO "
f"(4 original incl. 1 closed-out + 1 new current), got {n}"
)

scd_sql = (
"select dbt_valid_to is null as is_current, count(*) as n "
f"from `{ws2_workspace_name}`.`{ws2_lakehouse_name}`."
f"`{_WS2_WRITE_SCHEMA}`.cross_ws_write_snapshot "
"where id = 4 group by dbt_valid_to is null"
)
scd_rows = {bool(r[0]): int(r[1]) for r in project.run_sql(scd_sql, fetch="all")}
assert scd_rows.get(True) == 1, (
f"expected exactly 1 current row for id=4 in WS2 snapshot, got {scd_rows}"
)
assert scd_rows.get(False) == 1, (
f"expected exactly 1 historical (closed-out) row for id=4 in WS2 snapshot, "
f"got {scd_rows}"
)