Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/gci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,13 @@ jobs:
run: |
set -euo pipefail
.scripts/run.sh integration-test

- name: "📦 Upload failure logs"
if: failure()
uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4
with:
name: failure-logs
path: |
/tmp/imds-router.log
.logs/
retention-days: 7
82 changes: 80 additions & 2 deletions dbt/adapters/scope/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from __future__ import annotations

import logging
from datetime import datetime, timezone
from typing import Any

import agate
from dbt.adapters.base import BaseAdapter, available
from dbt_common.exceptions import DbtRuntimeError

from dbt.adapters.scope.adls_gen1_client import AdlsGen1Client, FileInfo
from dbt.adapters.scope.checkpoint import CheckpointManager
from dbt.adapters.scope.checkpoint import CheckpointManager, Watermark
from dbt.adapters.scope.column import ScopeColumn
from dbt.adapters.scope.connections import ScopeConnectionHandle, ScopeConnectionManager
from dbt.adapters.scope.credentials import ScopeCredentials
Expand All @@ -21,6 +22,28 @@
log = logging.getLogger(__name__)


def _parse_starting_timestamp(value: str) -> datetime:
"""Parse an ISO-8601 UTC timestamp string, raising on bad input.

Returns a timezone-aware ``datetime`` in UTC.
"""
try:
dt = datetime.fromisoformat(value)
except (ValueError, TypeError) as exc:
raise DbtRuntimeError(
f"Invalid starting_timestamp '{value}'. "
f"Expected an ISO-8601 UTC string such as '2026-04-07T10:00:00+00:00'."
) from exc

if dt.tzinfo is None:
raise DbtRuntimeError(
f"starting_timestamp '{value}' is missing timezone info. "
f"Use an explicit UTC offset, e.g. '2026-04-07T10:00:00+00:00'."
)

return dt.astimezone(timezone.utc)


class ScopeAdapter(BaseAdapter):
"""Adapter for submitting SCOPE scripts to Azure Data Lake Analytics."""

Expand Down Expand Up @@ -188,6 +211,7 @@ def discover_files(
max_files_per_trigger: int,
delta_location: str,
safety_buffer_seconds: int = 30,
starting_timestamp: str | None = None,
) -> list[str]:
"""Discover unprocessed source files and return a batch of file paths.

Expand All @@ -196,10 +220,33 @@ def discover_files(
1. For each (root, pattern): read watermark, LIST + filter files
2. Union results and deduplicate by file path
3. Return up to *max_files_per_trigger* file paths

If *starting_timestamp* is provided (ISO-8601 UTC) and no checkpoint
exists, only files modified after that timestamp are considered. When
a checkpoint already exists the parameter is silently ignored.
"""
# Validate starting_timestamp early (fail fast on bad input)
starting_ts_dt = (
_parse_starting_timestamp(starting_timestamp) if starting_timestamp else None
)

tracker = self._get_file_tracker()
watermark = self._get_checkpoint_manager().read_watermark(delta_location)

# Determine effective watermark: checkpoint wins over starting_timestamp
used_starting_timestamp = False
if watermark is not None:
effective_watermark = watermark
elif starting_ts_dt is not None:
effective_watermark = Watermark(modified_time=starting_ts_dt.isoformat())
used_starting_timestamp = True
log.info(
"No checkpoint found — using starting_timestamp=%s as initial offset",
starting_timestamp,
)
else:
effective_watermark = None

seen_paths: set[str] = set()
all_unprocessed: list[FileInfo] = []

Expand All @@ -208,14 +255,21 @@ def discover_files(
unprocessed = tracker.discover_unprocessed_files(
root=root,
pattern=pattern,
watermark=watermark,
watermark=effective_watermark,
safety_buffer_seconds=safety_buffer_seconds,
)
for f in unprocessed:
if f.path not in seen_paths:
seen_paths.add(f.path)
all_unprocessed.append(f)

# If starting_timestamp was used and yielded nothing, check whether
# there are source files at all — if so, the timestamp is too late.
if used_starting_timestamp and not all_unprocessed:
self._validate_starting_timestamp_has_files(
tracker, source_roots, source_patterns, starting_timestamp
)

# Sort by modification_time to maintain deterministic ordering
all_unprocessed.sort(key=lambda f: f.modification_time)
batch = FileTracker.get_next_batch(all_unprocessed, max_files_per_trigger)
Expand All @@ -229,6 +283,28 @@ def discover_files(
)
return [f.path for f in batch]

@staticmethod
def _validate_starting_timestamp_has_files(
tracker: FileTracker,
source_roots: list[str],
source_patterns: list[str],
starting_timestamp: str | None,
) -> None:
"""Raise if starting_timestamp is after all available source files."""
for root in source_roots:
for pattern in source_patterns:
files = tracker.discover_unprocessed_files(
root=root, pattern=pattern, watermark=None, safety_buffer_seconds=0
)
if files:
raise DbtRuntimeError(
f"starting_timestamp '{starting_timestamp}' is after all available "
f"source files. The latest file has modificationTime "
f"'{files[-1].modification_time.isoformat()}'. "
f"Use an earlier timestamp or remove starting_timestamp."
)
# No files exist at all — that's a legitimate empty source, not an error

@available
def update_checkpoint(
self,
Expand Down Expand Up @@ -302,6 +378,7 @@ def has_unprocessed_files(
source_patterns: list[str],
delta_location: str,
safety_buffer_seconds: int = 30,
starting_timestamp: str | None = None,
) -> bool:
"""Are there unprocessed files at the source?"""
files = self.discover_files(
Expand All @@ -310,6 +387,7 @@ def has_unprocessed_files(
max_files_per_trigger=1,
delta_location=delta_location,
safety_buffer_seconds=safety_buffer_seconds,
starting_timestamp=starting_timestamp,
)
return len(files) > 0

Expand Down
5 changes: 3 additions & 2 deletions dbt/include/scope/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
{%- set safety_buffer_seconds = config.get('safety_buffer_seconds', 30) | int -%}
{%- set source_compaction_interval = config.get('source_compaction_interval', 10) | int -%}
{%- set source_retention_files = config.get('source_retention_files', 100) | int -%}
{%- set starting_timestamp = config.get('starting_timestamp', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}
{%- set scope_settings = config.get('scope_settings', {}) -%}
{%- set scope_columns = config.get('scope_columns', []) -%}
Expand All @@ -54,7 +55,7 @@
batch_num=0,
total_files=0,
file_batch=adapter.discover_files(
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds, starting_timestamp
)
) -%}

Expand Down Expand Up @@ -100,7 +101,7 @@

{# -- Discover next batch (watermark advanced, so new files are eligible) -- #}
{%- set ns.file_batch = adapter.discover_files(
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds, starting_timestamp
) -%}
{%- endif -%}
{%- endfor -%}
Expand Down
5 changes: 3 additions & 2 deletions dbt/include/scope/macros/materializations/table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
{%- set safety_buffer_seconds = config.get('safety_buffer_seconds', 30) | int -%}
{%- set source_compaction_interval = config.get('source_compaction_interval', 10) | int -%}
{%- set source_retention_files = config.get('source_retention_files', 100) | int -%}
{%- set starting_timestamp = config.get('starting_timestamp', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}
{%- set scope_settings = config.get('scope_settings', {}) -%}
{%- set scope_columns = config.get('scope_columns', []) -%}
Expand All @@ -36,7 +37,7 @@
batch_num=0,
total_files=0,
file_batch=adapter.discover_files(
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds, starting_timestamp
)
) -%}

Expand Down Expand Up @@ -77,7 +78,7 @@

{# -- Discover next batch (watermark advanced) -- #}
{%- set ns.file_batch = adapter.discover_files(
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds
source_roots, source_patterns, max_files_per_trigger, delta_location, safety_buffer_seconds, starting_timestamp
) -%}
{%- endif -%}
{%- endfor -%}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/dbt_project/models/append_no_delete.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
source_roots=var('source_roots'),
source_patterns=var('source_patterns', ['.*\\.ss$']),
max_files_per_trigger=var('max_files_per_trigger', 50),
starting_timestamp='1900-01-01T00:00:00+00:00',
safety_buffer_seconds=0,
source_compaction_interval=1,
source_retention_files=100,
Expand Down
1 change: 1 addition & 0 deletions tests/integration/dbt_project/models/filtered_edition.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
source_roots=var('source_roots'),
source_patterns=var('source_patterns', ['.*\\.ss$']),
max_files_per_trigger=var('max_files_per_trigger', 50),
starting_timestamp='2026-01-01T00:00:00+00:00',
safety_buffer_seconds=0,
source_compaction_interval=1,
source_retention_files=100,
Expand Down
Loading
Loading