18
18
19
19
from typing import Any
20
20
21
- from airflow .configuration import conf
22
21
from airflow .jobs .dag_processor_job_runner import DagProcessorJobRunner
23
22
from airflow .jobs .scheduler_job_runner import SchedulerJobRunner
24
23
from airflow .jobs .triggerer_job_runner import TriggererJobRunner
29
28
30
29
def get_airflow_health () -> dict [str , Any ]:
31
30
"""Get the health for Airflow metadatabase, scheduler and triggerer."""
32
- dag_processor_enabled = conf .getboolean ("scheduler" , "standalone_dag_processor" )
33
31
metadatabase_status = HEALTHY
34
32
latest_scheduler_heartbeat = None
35
33
latest_triggerer_heartbeat = None
34
+ latest_dag_processor_heartbeat = None
36
35
37
36
scheduler_status = UNHEALTHY
38
37
triggerer_status : str | None = UNHEALTHY
38
+ dag_processor_status : str | None = UNHEALTHY
39
39
40
40
try :
41
41
latest_scheduler_job = SchedulerJobRunner .most_recent_job ()
@@ -59,6 +59,18 @@ def get_airflow_health() -> dict[str, Any]:
59
59
except Exception :
60
60
metadatabase_status = UNHEALTHY
61
61
62
+ try :
63
+ latest_dag_processor_job = DagProcessorJobRunner .most_recent_job ()
64
+
65
+ if latest_dag_processor_job :
66
+ latest_dag_processor_heartbeat = latest_dag_processor_job .latest_heartbeat .isoformat ()
67
+ if latest_dag_processor_job .is_alive ():
68
+ dag_processor_status = HEALTHY
69
+ else :
70
+ dag_processor_status = None
71
+ except Exception :
72
+ metadatabase_status = UNHEALTHY
73
+
62
74
airflow_health_status = {
63
75
"metadatabase" : {"status" : metadatabase_status },
64
76
"scheduler" : {
@@ -69,27 +81,10 @@ def get_airflow_health() -> dict[str, Any]:
69
81
"status" : triggerer_status ,
70
82
"latest_triggerer_heartbeat" : latest_triggerer_heartbeat ,
71
83
},
72
- }
73
-
74
- if dag_processor_enabled :
75
- latest_dag_processor_heartbeat = None
76
- dag_processor_status : str | None = UNHEALTHY
77
-
78
- try :
79
- latest_dag_processor_job = DagProcessorJobRunner .most_recent_job ()
80
-
81
- if latest_dag_processor_job :
82
- latest_dag_processor_heartbeat = latest_dag_processor_job .latest_heartbeat .isoformat ()
83
- if latest_dag_processor_job .is_alive ():
84
- dag_processor_status = HEALTHY
85
- else :
86
- dag_processor_status = None
87
- except Exception :
88
- metadatabase_status = UNHEALTHY
89
-
90
- airflow_health_status ["dag_processor" ] = {
84
+ "dag_processor" : {
91
85
"status" : dag_processor_status ,
92
86
"latest_dag_processor_heartbeat" : latest_dag_processor_heartbeat ,
93
- }
87
+ },
88
+ }
94
89
95
90
return airflow_health_status
0 commit comments