Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ As a result of this conscious design decision, the adapter **does not** encourag

## How it works

SS files live on ADLS Gen1. The adapter lists files under `source_root`, filters by regex and watermark, and processes them in batches of up to `max_files_per_trigger`. Each batch becomes a single SCOPE job with an explicit file list in the `EXTRACT FROM` clause. After a successful job, the watermark advances, a sources record is written to `_checkpoint/`, and the next batch is discovered — repeating until all files are processed.
SS files live on ADLS Gen1. The adapter lists files under each `source_roots` entry, filters by each regex in `source_patterns` (cross-product), deduplicates by path, and processes them in batches of up to `max_files_per_trigger`. Each batch becomes a single SCOPE job with an explicit file list in the `EXTRACT FROM` clause. After a successful job, the watermark advances, a sources record is written to `_checkpoint/`, and the next batch is discovered — repeating until all files are processed.

### How dbt picks which files to process

Expand Down Expand Up @@ -176,8 +176,8 @@ my_project:
{{ config(
materialized='table',
delta_location='abfss://ctr@acct.dfs.core.windows.net/delta/my_table',
source_root='/my/cosmos/path/to/MyStream',
source_pattern='.*\\.ss$',
source_roots=['/my/cosmos/path/to/MyStream'],
source_patterns=['.*\\.ss$'],
max_files_per_trigger=100,
partition_by='event_year_date',
scope_columns=[
Expand Down Expand Up @@ -205,8 +205,8 @@ FROM @data
incremental_strategy='append',
partition_by='event_year_date',
delta_location='abfss://ctr@acct.dfs.core.windows.net/delta/my_model',
source_root='/my/cosmos/path/to/MyStream',
source_pattern='.*\\.ss$',
source_roots=['/my/cosmos/path/to/MyStream'],
source_patterns=['.*\\.ss$'],
max_files_per_trigger=50,
source_compaction_interval=10,
source_retention_files=100,
Expand Down Expand Up @@ -239,8 +239,8 @@ Models can include `WHERE` clauses — the adapter passes through your SQL as-is
incremental_strategy='append',
partition_by=['event_year_date', 'edition'],
delta_location='abfss://ctr@acct.dfs.core.windows.net/delta/my_filtered_model',
source_root='/my/cosmos/path/to/MyStream',
source_pattern='.*\\.ss$',
source_roots=['/my/cosmos/path/to/MyStream'],
source_patterns=['.*\\.ss$'],
max_files_per_trigger=50,
scope_columns=[
{'name': 'server_name', 'type': 'string'},
Expand All @@ -260,8 +260,8 @@ WHERE edition == "Standard"

| Config | Default | Description |
| ---------------------------- | --------- | ----------------------------------------------------------------------------------- |
| `source_root` | — | ADLS Gen1 root path to list source files from |
| `source_pattern` | `.*\.ss$` | Regex to filter files under `source_root` |
| `source_roots` | `[]` | List of ADLS Gen1 root paths to list source files from |
| `source_patterns` | `[]` | List of regexes; the adapter discovers files for each root × pattern combo |
| `max_files_per_trigger` | `50` | Max files per SCOPE job. Larger = fewer jobs; smaller = faster feedback |
| `safety_buffer_seconds` | `30` | Skip files modified within the last N seconds (avoids partial writes) |
| `source_compaction_interval` | `10` | Every N batches, write a parquet snapshot of all source history |
Expand Down
84 changes: 54 additions & 30 deletions dbt/adapters/scope/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dbt.adapters.base import BaseAdapter, available
from dbt_common.exceptions import DbtRuntimeError

from dbt.adapters.scope.adls_gen1_client import AdlsGen1Client
from dbt.adapters.scope.adls_gen1_client import AdlsGen1Client, FileInfo
from dbt.adapters.scope.checkpoint import CheckpointManager
from dbt.adapters.scope.column import ScopeColumn
from dbt.adapters.scope.connections import ScopeConnectionHandle, ScopeConnectionManager
Expand Down Expand Up @@ -183,35 +183,47 @@ def set_next_job_name(self, name: str) -> None:
@available
def discover_files(
self,
source_root: str,
source_pattern: str,
source_roots: list[str],
source_patterns: list[str],
max_files_per_trigger: int,
delta_location: str,
safety_buffer_seconds: int = 30,
) -> list[str]:
"""Discover unprocessed source files and return a batch of file paths.

Orchestrates the file-based processing loop:
1. Read watermark from ``_checkpoint/watermark.json``
2. LIST files on ADLS Gen1 under *source_root*
3. Filter by regex *source_pattern* and watermark
4. Return up to *max_files_per_trigger* file paths
Orchestrates the file-based processing loop across the cross-product
of *source_roots* x *source_patterns*:
1. For each (root, pattern): read watermark, LIST + filter files
2. Union results and deduplicate by file path
3. Return up to *max_files_per_trigger* file paths
"""
tracker = self._get_file_tracker()
watermark = self._get_checkpoint_manager().read_watermark(delta_location)

all_unprocessed = tracker.discover_unprocessed_files(
root=source_root,
pattern=source_pattern,
watermark=watermark,
safety_buffer_seconds=safety_buffer_seconds,
)
seen_paths: set[str] = set()
all_unprocessed: list[FileInfo] = []

for root in source_roots:
for pattern in source_patterns:
unprocessed = tracker.discover_unprocessed_files(
root=root,
pattern=pattern,
watermark=watermark,
safety_buffer_seconds=safety_buffer_seconds,
)
for f in unprocessed:
if f.path not in seen_paths:
seen_paths.add(f.path)
all_unprocessed.append(f)

# Sort by modification_time to maintain deterministic ordering
all_unprocessed.sort(key=lambda f: f.modification_time)
batch = FileTracker.get_next_batch(all_unprocessed, max_files_per_trigger)

log.info(
"discover_files: root=%s, pattern=%s, unprocessed=%d, batch=%d",
source_root,
source_pattern,
"discover_files: roots=%s, patterns=%s, unprocessed=%d, batch=%d",
source_roots,
source_patterns,
len(all_unprocessed),
len(batch),
)
Expand All @@ -221,26 +233,38 @@ def discover_files(
def update_checkpoint(
self,
delta_location: str,
source_root: str,
source_pattern: str,
source_roots: list[str],
source_patterns: list[str],
file_paths: list[str],
source_compaction_interval: int = 10,
source_retention_files: int = 100,
) -> None:
"""Update the watermark checkpoint after a successful SCOPE job.

Also writes per-batch JSONL to ``_checkpoint/sources/{batch_id}``,
triggers compaction at interval boundaries, and enforces retention.
Iterates the cross-product of *source_roots* x *source_patterns* to
reconstruct ``FileInfo`` objects for the processed files, then writes
the checkpoint. Also writes per-batch JSONL to
``_checkpoint/sources/{batch_id}``, triggers compaction at interval
boundaries, and enforces retention.
"""
gen1 = self._get_gen1_client()
checkpoint = self._get_checkpoint_manager()

# Get current watermark
current = checkpoint.read_watermark(delta_location)

# Reconstruct FileInfo objects for the processed files
all_files = gen1.list_files(source_root, pattern=source_pattern)
processed = [f for f in all_files if f.path in set(file_paths)]
# Reconstruct FileInfo objects across all rootxpattern combos
target_paths = set(file_paths)
seen_paths: set[str] = set()
processed: list[FileInfo] = []

for root in source_roots:
for pattern in source_patterns:
all_files = gen1.list_files(root, pattern=pattern)
for f in all_files:
if f.path in target_paths and f.path not in seen_paths:
seen_paths.add(f.path)
processed.append(f)

if not processed:
log.warning("update_checkpoint: no matching files found for paths")
Expand Down Expand Up @@ -274,15 +298,15 @@ def delete_checkpoint(self, delta_location: str) -> None:
@available
def has_unprocessed_files(
self,
source_root: str,
source_pattern: str,
source_roots: list[str],
source_patterns: list[str],
delta_location: str,
safety_buffer_seconds: int = 30,
) -> bool:
"""Are there unprocessed files at the source?"""
files = self.discover_files(
source_root=source_root,
source_pattern=source_pattern,
source_roots=source_roots,
source_patterns=source_patterns,
max_files_per_trigger=1,
delta_location=delta_location,
safety_buffer_seconds=safety_buffer_seconds,
Expand Down Expand Up @@ -336,8 +360,8 @@ def build_script_config(self, model_config: dict[str, Any], table_name: str) ->
delta_base_path=creds.delta_base_path,
table_name=table_name,
partition_by=model_config.get("partition_by"),
source_root=model_config.get("source_root", ""),
source_pattern=model_config.get("source_pattern", ""),
source_roots=model_config.get("source_roots", []),
source_patterns=model_config.get("source_patterns", []),
max_files_per_trigger=model_config.get("max_files_per_trigger", 50),
safety_buffer_seconds=model_config.get("safety_buffer_seconds", 30),
adls_gen1_account=model_config.get("adls_gen1_account", creds.adls_gen1_account),
Expand Down
6 changes: 3 additions & 3 deletions dbt/adapters/scope/script_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ class ScriptConfig:
# Partitioning — single column name or list of column names
partition_by: str | list[str] | None = None

# File-based source configuration
source_root: str = ""
source_pattern: str = ""
# File-based source configuration (cross-product of roots x patterns)
source_roots: list[str] = field(default_factory=list)
source_patterns: list[str] = field(default_factory=list)
max_files_per_trigger: int = 50
safety_buffer_seconds: int = 30
adls_gen1_account: str = ""
Expand Down
10 changes: 5 additions & 5 deletions dbt/include/scope/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@

{# -- Pull config values -- #}
{%- set delta_location = config.get('delta_location', '') -%}
{%- set source_root = config.get('source_root', '') -%}
{%- set source_pattern = config.get('source_pattern', '.*\\.ss$') -%}
{%- set source_roots = config.get('source_roots', []) -%}
{%- set source_patterns = config.get('source_patterns', ['.*\\.ss$']) -%}
{%- set max_files_per_trigger = config.get('max_files_per_trigger', 50) | int -%}
{%- set safety_buffer_seconds = config.get('safety_buffer_seconds', 30) | int -%}
{%- set source_compaction_interval = config.get('source_compaction_interval', 10) | int -%}
Expand All @@ -54,7 +54,7 @@
batch_num=0,
total_files=0,
file_batch=adapter.discover_files(
source_root, source_pattern, max_files_per_trigger, delta_location, safety_buffer_seconds
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds
)
) -%}

Expand Down Expand Up @@ -96,11 +96,11 @@
{{ scope_script }}
{%- endcall -%}

{% do adapter.update_checkpoint(delta_location, source_root, source_pattern, ns.file_batch, source_compaction_interval, source_retention_files) %}
{% do adapter.update_checkpoint(delta_location, source_roots, source_patterns, ns.file_batch, source_compaction_interval, source_retention_files) %}

{# -- Discover next batch (watermark advanced, so new files are eligible) -- #}
{%- set ns.file_batch = adapter.discover_files(
source_root, source_pattern, max_files_per_trigger, delta_location, safety_buffer_seconds
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds
) -%}
{%- endif -%}
{%- endfor -%}
Expand Down
10 changes: 5 additions & 5 deletions dbt/include/scope/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

{# -- Pull config values -- #}
{%- set delta_location = config.get('delta_location', '') -%}
{%- set source_root = config.get('source_root', '') -%}
{%- set source_pattern = config.get('source_pattern', '.*\\.ss$') -%}
{%- set source_roots = config.get('source_roots', []) -%}
{%- set source_patterns = config.get('source_patterns', ['.*\\.ss$']) -%}
{%- set max_files_per_trigger = config.get('max_files_per_trigger', 50) | int -%}
{%- set safety_buffer_seconds = config.get('safety_buffer_seconds', 30) | int -%}
{%- set source_compaction_interval = config.get('source_compaction_interval', 10) | int -%}
Expand All @@ -36,7 +36,7 @@
batch_num=0,
total_files=0,
file_batch=adapter.discover_files(
source_root, source_pattern, max_files_per_trigger, delta_location, safety_buffer_seconds
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds
)
) -%}

Expand Down Expand Up @@ -73,11 +73,11 @@
{{ scope_script }}
{%- endcall -%}

{% do adapter.update_checkpoint(delta_location, source_root, source_pattern, ns.file_batch, source_compaction_interval, source_retention_files) %}
{% do adapter.update_checkpoint(delta_location, source_roots, source_patterns, ns.file_batch, source_compaction_interval, source_retention_files) %}

{# -- Discover next batch (watermark advanced) -- #}
{%- set ns.file_batch = adapter.discover_files(
source_root, source_pattern, max_files_per_trigger, delta_location, safety_buffer_seconds
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds
) -%}
{%- endif -%}
{%- endfor -%}
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/dbt_project/models/append_no_delete.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
incremental_strategy='append',
partition_by='event_year_date',
delta_location=var('delta_location'),
source_root=var('source_root'),
source_pattern=var('source_pattern', '.*\\.ss$'),
source_roots=var('source_roots'),
source_patterns=var('source_patterns', ['.*\\.ss$']),
max_files_per_trigger=var('max_files_per_trigger', 50),
safety_buffer_seconds=0,
source_compaction_interval=1,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/dbt_project/models/filtered_edition.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
incremental_strategy='append',
partition_by='event_year_date',
delta_location=var('delta_location_filtered'),
source_root=var('source_root'),
source_pattern=var('source_pattern', '.*\\.ss$'),
source_roots=var('source_roots'),
source_patterns=var('source_patterns', ['.*\\.ss$']),
max_files_per_trigger=var('max_files_per_trigger', 50),
safety_buffer_seconds=0,
source_compaction_interval=1,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/test_dbt_scope.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def _dbt_vars(scenario: ScenarioConfig) -> dict:
return {
"delta_location": scenario.delta_location,
"delta_location_filtered": f"{scenario.delta_location}_filtered",
"source_root": scenario.historical.ss_base_path,
"source_pattern": r".*\.ss$",
"source_roots": [scenario.historical.ss_base_path],
"source_patterns": [r".*\.ss$"],
"max_files_per_trigger": 500,
}

Expand Down
8 changes: 4 additions & 4 deletions tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ def sample_config(sample_columns: list[ColumnDef]) -> ScriptConfig:
delta_base_path="delta",
table_name="my_table",
partition_by="event_year_date",
source_root="/shares/test/ss/MyStream",
source_pattern=r".*\.ss$",
source_roots=["/shares/test/ss/MyStream"],
source_patterns=[r".*\.ss$"],
max_files_per_trigger=50,
safety_buffer_seconds=30,
adls_gen1_account="test-adls-gen1",
Expand Down Expand Up @@ -63,8 +63,8 @@ def multi_partition_config(multi_partition_columns: list[ColumnDef]) -> ScriptCo
delta_base_path="delta",
table_name="multi_tbl",
partition_by=["event_year_date", "edition"],
source_root="/shares/test/ss/MyStream",
source_pattern=r".*\.ss$",
source_roots=["/shares/test/ss/MyStream"],
source_patterns=[r".*\.ss$"],
source_files=[
"/shares/test/ss/MyStream/2026/04/01/20260401_010000_0.ss",
],
Expand Down
Loading
Loading