Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,14 @@ my_project:
source_patterns=['.*\\.ss$'],
max_files_per_trigger=100,
partition_by='event_year_date',
scope_columns=[
delta_table_columns=[
{'name': 'server_name', 'type': 'string'},
{'name': 'source_file_uri', 'type': 'string'},
{'name': 'event_year_date', 'type': 'string', 'extract': false}
{'name': 'event_year_date', 'type': 'string'}
],
extract_columns=[
{'name': 'logical_server_name_DT_String', 'type': 'string'},
{'name': 'source_file_uri', 'type': 'string'}
],
scope_settings={
'microsoft.scope.compression': 'vorder:zstd#11',
Expand All @@ -210,10 +214,14 @@ FROM @data
max_files_per_trigger=50,
source_compaction_interval=10,
source_retention_files=100,
scope_columns=[
delta_table_columns=[
{'name': 'server_name', 'type': 'string'},
{'name': 'source_file_uri', 'type': 'string'},
{'name': 'event_year_date', 'type': 'string', 'extract': false}
{'name': 'event_year_date', 'type': 'string'}
],
extract_columns=[
{'name': 'logical_server_name_DT_String', 'type': 'string'},
{'name': 'source_file_uri', 'type': 'string'}
]
) }}

Expand Down Expand Up @@ -242,10 +250,14 @@ Models can include `WHERE` clauses — the adapter passes through your SQL as-is
source_roots=['/my/cosmos/path/to/MyStream'],
source_patterns=['.*\\.ss$'],
max_files_per_trigger=50,
scope_columns=[
delta_table_columns=[
{'name': 'server_name', 'type': 'string'},
{'name': 'edition', 'type': 'string'},
{'name': 'event_year_date', 'type': 'string', 'extract': false}
{'name': 'event_year_date', 'type': 'string'}
],
extract_columns=[
{'name': 'logical_server_name_DT_String', 'type': 'string'},
{'name': 'edition', 'type': 'string'}
]
) }}

Expand All @@ -266,7 +278,9 @@ WHERE edition == "Standard"
| `safety_buffer_seconds` | `30` | Skip files modified within the last N seconds (avoids partial writes) |
| `source_compaction_interval` | `10` | Every N batches, write a parquet snapshot of all source history |
| `source_retention_files` | `100` | Max files in `_checkpoint/sources/` — oldest are deleted first |
| `partition_by` | — | Single column name or list of columns. Columns with `'extract': false` are computed |
| `delta_table_columns` | `[]` | Delta table schema (CREATE TABLE). List of `{name, type}` dicts |
| `extract_columns` | `[]` | Source file columns (EXTRACT). List of `{name, type}` dicts |
| `partition_by` | — | Single column name or list of columns |

`dbt retry` re-runs failed batches. `dbt run --full-refresh` resets the checkpoint and reprocesses all files.

Expand Down
13 changes: 7 additions & 6 deletions dbt/adapters/scope/_file_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from __future__ import annotations

import logging
import random
import sys
import tempfile
Expand All @@ -11,7 +10,9 @@
from types import TracebackType
from typing import TypeVar

log = logging.getLogger(__name__)
from dbt.adapters.events.logging import AdapterLogger

log = AdapterLogger("scope")

T = TypeVar("T")

Expand Down Expand Up @@ -49,14 +50,14 @@ def __init__(self, lock_file: str, timeout: float = _DEFAULT_TIMEOUT) -> None:
def __enter__(self) -> FileLock:
self.lock_path.parent.mkdir(parents=True, exist_ok=True)
self._lock_file = self.lock_path.open("w")
log.debug("Acquiring file lock: %s", self.lock_path)
log.debug(f"Acquiring file lock: {self.lock_path!s}")
if sys.platform == "win32":
self._lock_win32()
else:
import fcntl

fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_EX) # type: ignore[union-attr]
log.debug("Acquired file lock: %s", self.lock_path)
log.debug(f"Acquired file lock: {self.lock_path!s}")
return self

def __exit__(
Expand All @@ -75,7 +76,7 @@ def __exit__(

fcntl.flock(self._lock_file.fileno(), fcntl.LOCK_UN) # type: ignore[union-attr]
self._lock_file.close() # type: ignore[union-attr]
log.debug("Released file lock: %s", self.lock_path)
log.debug(f"Released file lock: {self.lock_path!s}")
return False

# -- Windows retry logic --------------------------------------------------
Expand Down Expand Up @@ -109,7 +110,7 @@ def _lock_win32(self) -> None:
if attempt % 20 == 0:
log.debug(
"File lock contention on %s, attempt %d, sleeping %.2fs",
self.lock_path,
str(self.lock_path),
attempt,
sleep_time,
)
Expand Down
120 changes: 93 additions & 27 deletions dbt/adapters/scope/adls_gen1_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,19 @@

from __future__ import annotations

import logging
import re
from dataclasses import dataclass
import time
from concurrent.futures import FIRST_COMPLETED, Future, ThreadPoolExecutor, wait
from dataclasses import dataclass, field
from datetime import datetime, timezone

from azure.datalake.store import core as adls_core
from azure.identity import AzureCliCredential
from dbt.adapters.events.logging import AdapterLogger

from dbt.adapters.scope._file_lock import AZ_CLI_TOKEN_LOCK, FileLock

log = logging.getLogger(__name__)
log = AdapterLogger("scope")


@dataclass(frozen=True)
Expand All @@ -28,6 +30,7 @@ class FileInfo:
name: str
length: int
modification_time: datetime
raw: dict = field(default_factory=dict, repr=False, compare=False, hash=False)

@classmethod
def from_adls_entry(cls, entry: dict) -> FileInfo | None:
Expand All @@ -48,9 +51,32 @@ def from_adls_entry(cls, entry: dict) -> FileInfo | None:
name=raw_path.rsplit("/", 1)[-1],
length=entry.get("length", 0),
modification_time=datetime.fromtimestamp(mod_ms / 1000, tz=timezone.utc),
raw=entry,
)


def _list_one_dir(
fs: adls_core.AzureDLFileSystem,
dir_path: str,
depth: int,
) -> tuple[list[dict], list[dict], str, int, float]:
"""List a single directory. Returns (files, subdirs, path, depth, elapsed_ms)."""
t0 = time.monotonic()
try:
entries = fs.ls(dir_path, detail=True)
except FileNotFoundError:
log.warning(f"Path not found (skipping): {dir_path}")
return [], [], dir_path, depth, (time.monotonic() - t0) * 1000
except Exception:
log.warning(f"Failed to list {dir_path} (skipping)")
return [], [], dir_path, depth, (time.monotonic() - t0) * 1000

elapsed_ms = (time.monotonic() - t0) * 1000
files = [e for e in entries if e.get("type") != "DIRECTORY"]
dirs = [e for e in entries if e.get("type") == "DIRECTORY"]
return files, dirs, dir_path, depth, elapsed_ms


class AdlsGen1Client:
"""Client for listing files on ADLS Gen1 with watermark support."""

Expand Down Expand Up @@ -81,66 +107,106 @@ def list_files(
*,
pattern: str | None = None,
recursive: bool = True,
max_workers: int = 8,
) -> list[FileInfo]:
"""List all files under *root*, optionally filtering by regex *pattern*.

Args:
root: ADLS Gen1 path (e.g. ``/shares/SQLDB.Prod/local/...``).
pattern: Regex pattern to match against the file name (not full path).
Only files whose name matches are returned.
recursive: If True, walk subdirectories.
recursive: If True, walk subdirectories in parallel.
max_workers: Max threads for parallel recursive listing.

Returns:
Sorted list of ``FileInfo`` objects (sorted by modification_time ASC).
"""
fs = self._get_fs()
compiled = re.compile(pattern) if pattern else None
log.info("Listing files: account=%s, root=%s, pattern=%s", self._account, root, pattern)
log.debug(f"Listing files: account={self._account}, root={root}, pattern={pattern}")

walk_start = time.monotonic()
if recursive:
raw_entries = self._walk(fs, root)
raw_entries = self._walk(fs, root, max_workers)
else:
t0 = time.monotonic()
try:
raw_entries = fs.ls(root, detail=True)
except FileNotFoundError:
log.warning("Path not found: %s", root)
log.warning(f"Path not found: {root}")
return []
except Exception:
log.warning("Failed to list %s", root, exc_info=True)
log.warning(f"Failed to list {root}")
return []
finally:
elapsed_ms = (time.monotonic() - t0) * 1000
log.debug(f"ls {root} completed in {elapsed_ms:.1f} ms")
walk_elapsed_ms = (time.monotonic() - walk_start) * 1000
log.debug(f"Total walk of {root} completed in {walk_elapsed_ms:.1f} ms")

files: list[FileInfo] = []
skipped_empty = 0
for entry in raw_entries:
info = FileInfo.from_adls_entry(entry)
if info is None:
continue
if info.length == 0:
skipped_empty += 1
continue
if compiled and not compiled.search(info.name):
continue
files.append(info)

if skipped_empty:
log.debug(f"Skipped {skipped_empty} zero-length files under {root}")
files.sort(key=lambda f: f.modification_time)
log.info("Found %d files matching pattern under %s", len(files), root)
log.debug(f"Found {len(files)} files matching pattern under {root}")
return files

@staticmethod
def _walk(
fs: adls_core.AzureDLFileSystem,
path: str,
root: str,
max_workers: int,
) -> list[dict]:
"""Recursively list all file entries under *path*."""
try:
entries = fs.ls(path, detail=True)
except FileNotFoundError:
log.warning("Path not found (skipping): %s", path)
return []
except Exception:
log.warning("Failed to list %s (skipping)", path, exc_info=True)
return []

files = [e for e in entries if e.get("type") != "DIRECTORY"]
dirs = [e for e in entries if e.get("type") == "DIRECTORY"]

for d in sorted(dirs, key=lambda e: e.get("name", "")):
files.extend(AdlsGen1Client._walk(fs, d["name"]))

return files
"""Walk directories in parallel, logging per-directory progress."""
all_files: list[dict] = []
dirs_done = 0

with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures: dict[Future, tuple[str, int]] = {}

f = executor.submit(_list_one_dir, fs, root, 0)
futures[f] = (root, 0)

while futures:
done, _ = wait(futures, return_when=FIRST_COMPLETED)

for completed in done:
futures.pop(completed)
try:
files, dirs, dir_path, depth, elapsed_ms = completed.result()
except Exception:
dirs_done += 1
continue

dirs_done += 1
short = dir_path.rsplit("/", 1)[-1] or dir_path
log.debug(
f"Depth {depth} | {short} → "
f"{len(dirs)} dirs, {len(files)} files "
f"({elapsed_ms:.0f} ms) | "
f"done: {dirs_done}, in-flight: {len(futures)}"
)

all_files.extend(files)

for d in sorted(dirs, key=lambda e: e.get("name", "")):
new_f = executor.submit(_list_one_dir, fs, d["name"], depth + 1)
futures[new_f] = (d["name"], depth + 1)

if futures:
log.debug(f"Queue: {len(futures)} directories pending")

log.debug(f"Walk complete: {dirs_done} directories scanned")
return all_files
Loading
Loading