Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions dagfactory/dagbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@
try: # Try Airflow 3
from airflow.providers.standard.operators.python import BranchPythonOperator, PythonOperator
from airflow.providers.standard.sensors.python import PythonSensor
from airflow.timetables.assets import AssetOrTimeSchedule
except ImportError:
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.sensors.python import PythonSensor

try: # Try Airflow 2.9
from airflow.timetables.datasets import DatasetOrTimeSchedule
except:
pass

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -606,6 +611,7 @@ def configure_schedule(dag_params: Dict[str, Any], dag_kwargs: Dict[str, Any]) -
# Only check for file and datasets attributes if schedule is a dict
has_file_attr = isinstance(schedule, dict) and utils.check_dict_key(schedule, "file")
has_datasets_attr = isinstance(schedule, dict) and utils.check_dict_key(schedule, "datasets")
has_timetable_attr = isinstance(schedule, dict) and utils.check_dict_key(schedule, "timetable")

if has_file_attr and has_datasets_attr:
file = schedule.get("file")
Expand All @@ -616,8 +622,19 @@ def configure_schedule(dag_params: Dict[str, Any], dag_kwargs: Dict[str, Any]) -
elif has_datasets_attr and is_airflow_version_at_least_2_9:
datasets = schedule["datasets"]
datasets_conditions: str = utils.parse_list_datasets(datasets)
dag_kwargs[schedule_key] = DagBuilder.evaluate_condition_with_datasets(datasets_conditions)

datasets_schedule = DagBuilder.evaluate_condition_with_datasets(datasets_conditions)
if has_timetable_attr:
timetable_schedule = schedule["timetable"]
if version.parse(AIRFLOW_VERSION) < version.parse("3.0.0"):
dag_kwargs[schedule_key] = DatasetOrTimeSchedule(
timetable=timetable_schedule, datasets=datasets_schedule
)
else:
dag_kwargs[schedule_key] = AssetOrTimeSchedule(
timetable=timetable_schedule, assets=datasets_schedule
)
else:
dag_kwargs[schedule_key] = datasets_schedule
else:
if isinstance(schedule, str):
# check if it's "none" (case-insensitive, with whitespace)
Expand Down
16 changes: 16 additions & 0 deletions dev/dags/datasets/example_dataset_or_timetable_schedule.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
consumer_time_schedule_dag:
catchup: false
default_args:
owner: "example_owner"
start_date: '2024-01-01'
description: "Example DAG consumer scheduled by datasets or timetable"
schedule:
datasets: "((s3://bucket-cjmm/raw/dataset_custom_1 & s3://bucket-cjmm/raw/dataset_custom_2) | s3://bucket-cjmm/raw/dataset_custom_3)"
timetable:
__type__: airflow.timetables.trigger.CronTriggerTimetable
cron: "0 1 * * 5"
timezone: UTC
tasks:
- task_id: "task_1"
operator: airflow.operators.bash.BashOperator
bash_command: "echo 'consumer datasets'"