Skip to content

Commit 3e99fcb

Browse files
Cron Job Schedules (#74)
1 parent 65556d8 commit 3e99fcb

File tree

10 files changed

+69
-36
lines changed

10 files changed

+69
-36
lines changed

dagify/converter/engine.py

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -310,6 +310,53 @@ def get_template_name(self, job_type):
310310
# no match found
311311
return None
312312

313+
def calculate_cron_schedule(self, task):
314+
315+
timefrom = task.get_attribute("TIMEFROM")
316+
317+
if not timefrom:
318+
return None
319+
320+
schedule_interval = None
321+
minute = timefrom[2:]
322+
hour = timefrom[:2]
323+
weekdays = task.get_attribute("WEEKDAYS")
324+
if weekdays:
325+
day_of_week = ",".join(weekdays.split(","))
326+
else:
327+
day_of_week = "*"
328+
329+
month_abbreviations = ["JAN", "FEB", "MAR", "APR", "MAY", "JUN", "JUL", "AUG", "SEP", "OCT", "NOV", "DEC"]
330+
# Get the list of months that are set to "1"
331+
months = [i + 1 for i, month in enumerate(month_abbreviations) if task.get_attribute(month) == "1"]
332+
if months:
333+
months.sort()
334+
# Identify consecutive month ranges
335+
month_ranges = []
336+
current_range = [months[0]]
337+
for i in range(1, len(months)):
338+
if months[i] == current_range[-1] + 1: # Check for consecutive months
339+
current_range.append(months[i])
340+
else:
341+
month_ranges.append(current_range) # Start a new range if not consecutive
342+
current_range = [months[i]]
343+
month_ranges.append(current_range) # Add the last range
344+
345+
month_parts = []
346+
for r in month_ranges:
347+
if len(r) == 1:
348+
month_parts.append(str(r[0])) # Single month
349+
else:
350+
month_parts.append(f"{r[0]}-{r[-1]}") # Month range
351+
352+
month_schedule = ",".join(month_parts)
353+
else:
354+
month_schedule = "*"
355+
356+
schedule_interval = [minute, hour, "*", month_schedule, day_of_week] # Day of month to start is set to "*"
357+
schedule_interval = " ".join(schedule_interval)
358+
return schedule_interval
359+
313360
def generate_airflow_dags(self):
314361

315362
if self.uf is None:
@@ -318,11 +365,14 @@ def generate_airflow_dags(self):
318365
for tIdx, dag_divider_value in enumerate(self.get_dag_dividers()):
319366
airflow_task_outputs = []
320367
tasks = []
368+
schedule_interval = None
321369
for tIdx, task in enumerate(self.uf.get_tasks()):
322370
# Capture the airflow tasks for each dag divider
323371
if task.get_attribute(self.dag_divider) == dag_divider_value:
324372
tasks.append(task.get_attribute("JOBNAME_ORIGINAL"))
325373
airflow_task_outputs.append(task.get_airflow_task_output())
374+
if not schedule_interval:
375+
schedule_interval = self.calculate_cron_schedule(task)
326376

327377
# Calculate DAG Specific Python Imports
328378
dag_python_imports = self.uf.calculate_dag_python_imports(
@@ -341,11 +391,11 @@ def generate_airflow_dags(self):
341391
for dep in dependencies[dag_divider_value][task]['external']:
342392
ext_task_uf = self.uf.get_task_by_attr("JOBNAME_ORIGINAL", dep)
343393
dependencies_in_dag_external.append({
344-
'task_name': task,
345-
'ext_dag': ext_task_uf.get_attribute(self.dag_divider),
394+
'task_name': task,
395+
'ext_dag': ext_task_uf.get_attribute(self.dag_divider),
346396
'ext_dep_task': dep,
347397
"marker_name": dep + "_marker_" + ''.join(random.choices('0123456789abcdef', k=4))
348-
})
398+
})
349399

350400
# Calculate external upstream dependencies where a task in the current dag depends on another dag's task
351401
# Such a dependency will require a DAG Sensor
@@ -382,11 +432,12 @@ def generate_airflow_dags(self):
382432
baseline_imports=self.get_baseline_imports(),
383433
custom_imports=dag_python_imports,
384434
dag_id=dag_divider_value,
435+
schedule_interval=schedule_interval,
385436
tasks=airflow_task_outputs,
386437
dependencies_int=dependencies_in_dag_internal,
387438
dependencies_ext=dependencies_in_dag_external,
388439
upstream_dependencies=upstream_dependencies
389-
)
440+
)
390441
with open(filename, mode="w", encoding="utf-8") as dag_file:
391442
dag_file.write(content)
392443

dagify/converter/templates/dag.tmpl

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ default_args = {
1313

1414
with DAG(
1515
dag_id="{{dag_id}}",
16-
start_date=datetime.datetime(2024, 1, 1),
17-
#schedule="@daily",
18-
schedule_interval='*/5 * * * *',
16+
{% if schedule_interval %}schedule_interval="{{ schedule_interval }}"{% else %}schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily{% endif %},
1917
catchup=False,
2018
) as dag:
2119

dagify/test/integration/test_references/009-fast-x/fast_x_reports.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
with DAG(
1515
dag_id="fast_x_reports",
16-
start_date=datetime.datetime(2024, 1, 1),
17-
#schedule="@daily",
18-
schedule_interval='*/5 * * * *',
16+
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
1917
catchup=False,
2018
) as dag:
2119

dagify/test/integration/test_references/010-fast-x/fx_fld_001_app_001_subapp_001.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_001_app_001_subapp_001",
16-
start_date=datetime.datetime(2024, 1, 1),
17-
#schedule="@daily",
18-
schedule_interval='*/5 * * * *',
16+
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
1917
catchup=False,
2018
) as dag:
2119

dagify/test/integration/test_references/011-fast-x/fx_fld_001_app_001_subapp_001.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_001_app_001_subapp_001",
16-
start_date=datetime.datetime(2024, 1, 1),
17-
#schedule="@daily",
18-
schedule_interval='*/5 * * * *',
16+
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
1917
catchup=False,
2018
) as dag:
2119

@@ -48,8 +46,8 @@
4846

4947
# Airflow Downstream Task Dependencies (external dags)
5048

51-
fx_fld_001_app_002_subapp_002_job_003_marker
52-
task_id="fx_fld_001_app_002_subapp_002_job_003_marker
49+
fx_fld_001_app_002_subapp_002_job_003_marker_a61c = ExternalTaskMarker(
50+
task_id="fx_fld_001_app_002_subapp_002_job_003_marker_a61c",
5351
external_dag_id='fx_fld_001_app_002_subapp_002',
5452
external_task_id='fx_fld_001_app_002_subapp_002_job_003'
5553
)

dagify/test/integration/test_references/011-fast-x/fx_fld_001_app_002_subapp_001.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_001_app_002_subapp_001",
16-
start_date=datetime.datetime(2024, 1, 1),
17-
#schedule="@daily",
18-
schedule_interval='*/5 * * * *',
16+
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
1917
catchup=False,
2018
) as dag:
2119

dagify/test/integration/test_references/011-fast-x/fx_fld_001_app_002_subapp_002.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_001_app_002_subapp_002",
16-
start_date=datetime.datetime(2024, 1, 1),
17-
#schedule="@daily",
18-
schedule_interval='*/5 * * * *',
16+
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
1917
catchup=False,
2018
) as dag:
2119

@@ -50,12 +48,12 @@
5048

5149
# Airflow Upstream Task Dependencies (external dags)
5250

53-
fx_fld_001_app_002_subapp_002_job_003_sensor
54-
task_id="fx_fld_001_app_002_subapp_002_job_003_sensor
51+
fx_fld_001_app_002_subapp_002_job_003_sensor_2b59 = ExternalTaskSensor(
52+
task_id="fx_fld_001_app_002_subapp_002_job_003_sensor_2b59",
5553
external_dag_id="fx_fld_001_app_001_subapp_001",
5654
external_task_id="fx_fld_001_app_001_subapp_001_job_001",
5755
dag=dag
5856
)
59-
fx_fld_001_app_002_subapp_002_job_003_sensor
57+
fx_fld_001_app_002_subapp_002_job_003_sensor_2b59 >> fx_fld_001_app_002_subapp_002_job_003
6058

6159

dagify/test/integration/test_references/011-fast-x/fx_fld_002_app_001_subapp_001.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_002_app_001_subapp_001",
16-
start_date=datetime.datetime(2024, 1, 1),
17-
#schedule="@daily",
18-
schedule_interval='*/5 * * * *',
16+
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
1917
catchup=False,
2018
) as dag:
2119

dagify/test/integration/test_references/012-fast-x/fx_fld_001_app_001_subapp_001.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_001_app_001_subapp_001",
16-
start_date=datetime.datetime(2024, 1, 1),
17-
#schedule="@daily",
18-
schedule_interval='*/5 * * * *',
16+
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
1917
catchup=False,
2018
) as dag:
2119

dagify/test/integration/test_references/012-fast-x/fx_fld_002_app_001_subapp_001.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313

1414
with DAG(
1515
dag_id="fx_fld_002_app_001_subapp_001",
16-
start_date=datetime.datetime(2024, 1, 1),
17-
#schedule="@daily",
18-
schedule_interval='*/5 * * * *',
16+
schedule_interval="@daily", # TIMEFROM not found, default schedule set to @daily,
1917
catchup=False,
2018
) as dag:
2119

0 commit comments

Comments
 (0)