Skip to content

feat: Add max_lookback_days parameter to Source.read() #632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 95 additions & 8 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import json
import warnings
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal

Expand Down Expand Up @@ -32,6 +33,7 @@
from airbyte.records import StreamRecord, StreamRecordHandler
from airbyte.results import ReadResult
from airbyte.shared.catalog_providers import CatalogProvider
from airbyte.shared.state_writers import NoOpStateWriter, StateWriterBase
from airbyte.strategies import WriteStrategy


Expand Down Expand Up @@ -602,6 +604,29 @@ def _log_incremental_streams(
)
print(log_message)

def _setup_state_handling(
self,
cache: CacheBase,
*,
force_full_refresh: bool,
max_lookback_days: int | None,
) -> tuple[StateProviderBase | None, StateWriterBase]:
"""Set up state provider and writer based on sync mode."""
if force_full_refresh or max_lookback_days is not None:
state_provider = None
# Use NoOpStateWriter to prevent state changes from being committed
if max_lookback_days is not None:
state_writer: StateWriterBase = NoOpStateWriter()
else:
state_writer = cache.get_state_writer(source_name=self._name)
else:
state_provider = cache.get_state_provider(
source_name=self._name,
)
state_writer = cache.get_state_writer(source_name=self._name)

return state_provider, state_writer

def read(
self,
cache: CacheBase | None = None,
Expand All @@ -610,6 +635,7 @@ def read(
write_strategy: str | WriteStrategy = WriteStrategy.AUTO,
force_full_refresh: bool = False,
skip_validation: bool = False,
max_lookback_days: int | None = None,
) -> ReadResult:
"""Read from the connector and write to the cache.

Expand All @@ -628,6 +654,10 @@ def read(
running the connector. This can be helpful in debugging, when you want to send
configurations to the connector that otherwise might be rejected by JSON Schema
validation rules.
max_lookback_days: If provided, a UTC start_date will be calculated by subtracting this
number of days from the current UTC datetime, and injected into the connector's
configuration at runtime. In this mode, prior incremental state cursors
will be ignored and new state changes will not be committed.
"""
cache = cache or get_default_cache()
progress_tracker = ProgressTracker(
Expand All @@ -637,14 +667,10 @@ def read(
expected_streams=None, # Will be set later
)

# Set up state provider if not in full refresh mode
if force_full_refresh:
state_provider: StateProviderBase | None = None
else:
state_provider = cache.get_state_provider(
source_name=self._name,
)
state_writer = cache.get_state_writer(source_name=self._name)
# Set up state provider and writer based on mode
state_provider, state_writer = self._setup_state_handling(
cache, force_full_refresh=force_full_refresh, max_lookback_days=max_lookback_days
)

if streams:
self.select_streams(streams)
Expand All @@ -655,6 +681,63 @@ def read(
available_streams=self.get_available_streams(),
)

# Helper function to prepare config with lookback days
def _prepare_lookback_config(
days: int | None,
) -> tuple[dict[str, Any] | None, dict[str, Any] | None]:
"""Prepare config with lookback days and return original config for restoration."""
if days is None:
return None, None

# Calculate the start_date by subtracting days from utcnow
start_date_dt = datetime.now(tz=timezone.utc) - timedelta(days=days)
start_date = start_date_dt.isoformat()

# Create a copy of the config
modified = self.get_config().copy()

# Check if connector supports start_date
def _check_start_date_support() -> None:
spec = self.config_spec
properties = spec.get("properties", {})
if "start_date" not in properties:
raise exc.PyAirbyteInputError(
message="Connector does not support start_date configuration.",
context={
"connector_name": self.name,
"max_lookback_days": days,
},
)

try:
_check_start_date_support()
except exc.PyAirbyteInputError:
raise
except Exception:
# If we can't get config spec for other reasons, assume start_date is supported
pass

# Inject start_date into config
modified["start_date"] = start_date

# Print message to user
print(
f"Executing sync with max {days} days lookback "
f'(from "{start_date}" UTC). '
f"Note that other incremental state cursors will be ignored "
f"and state checkpointing will be disabled for this sync."
)

# Return modified config and original for restoration
return modified, self._config_dict

# Prepare config with lookback days if needed
modified_config, original_config = _prepare_lookback_config(max_lookback_days)

# Apply modified config if available
if modified_config is not None:
self._config_dict = modified_config

try:
result = self._read_to_cache(
cache=cache,
Expand All @@ -676,6 +759,10 @@ def read(
except Exception as ex:
progress_tracker.log_failure(exception=ex)
raise
finally:
# Restore the original config if it was modified
if original_config is not None:
self._config_dict = original_config

progress_tracker.log_success()
return result
Expand Down
Loading