Skip to content

Commit f26c445

Browse files
authored
Merge pull request #105 from sawaca96/filter-dag-not-in-db
Join with SerializedDagModel to filter dag not in webserver
2 parents 2aee57f + 2749caa commit f26c445

File tree

1 file changed

+41
-21
lines changed

1 file changed

+41
-21
lines changed

airflow_exporter/prometheus_exporter.py

+41-21
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from airflow.plugins_manager import AirflowPlugin
1414
from airflow.settings import Session
1515
from airflow.models import TaskInstance, DagModel, DagRun
16+
from airflow.models.serialized_dag import SerializedDagModel
1617
from airflow.utils.state import State
1718

1819
# Importing base classes that we need to derive
@@ -39,10 +40,15 @@ def get_dag_status_info() -> List[DagStatusInfo]:
3940
DagRun.dag_id, DagRun.state, func.count(DagRun.state).label('cnt')
4041
).group_by(DagRun.dag_id, DagRun.state).subquery()
4142

42-
sql_res = Session.query( # pylint: disable=no-member
43-
dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.cnt,
44-
DagModel.owners
45-
).join(DagModel, DagModel.dag_id == dag_status_query.c.dag_id).all()
43+
sql_res = (
44+
Session.query( # pylint: disable=no-member
45+
dag_status_query.c.dag_id, dag_status_query.c.state, dag_status_query.c.cnt,
46+
DagModel.owners
47+
)
48+
.join(DagModel, DagModel.dag_id == dag_status_query.c.dag_id)
49+
.join(SerializedDagModel, SerializedDagModel.dag_id == dag_status_query.c.dag_id)
50+
.all()
51+
)
4652

4753
res = [
4854
DagStatusInfo(
@@ -69,10 +75,16 @@ def get_last_dagrun_info() -> List[DagStatusInfo]:
6975
order_by=DagRun.execution_date.desc()).label('row_number')
7076
).subquery()
7177

72-
sql_res = Session.query(
73-
last_dagrun_query.c.dag_id, last_dagrun_query.c.state, last_dagrun_query.c.row_number,
74-
DagModel.owners
75-
).filter(last_dagrun_query.c.row_number == 1).join(DagModel, DagModel.dag_id == last_dagrun_query.c.dag_id).all()
78+
sql_res = (
79+
Session.query(
80+
last_dagrun_query.c.dag_id, last_dagrun_query.c.state, last_dagrun_query.c.row_number,
81+
DagModel.owners
82+
)
83+
.filter(last_dagrun_query.c.row_number == 1)
84+
.join(DagModel, DagModel.dag_id == last_dagrun_query.c.dag_id)
85+
.join(SerializedDagModel, SerializedDagModel.dag_id == last_dagrun_query.c.dag_id)
86+
.all()
87+
)
7688

7789
res = [
7890
DagStatusInfo(
@@ -106,10 +118,16 @@ def get_task_status_info() -> List[TaskStatusInfo]:
106118
TaskInstance.state, func.count(TaskInstance.dag_id).label('cnt')
107119
).group_by(TaskInstance.dag_id, TaskInstance.task_id, TaskInstance.state).subquery()
108120

109-
sql_res = Session.query( # pylint: disable=no-member
110-
task_status_query.c.dag_id, task_status_query.c.task_id,
111-
task_status_query.c.state, task_status_query.c.cnt, DagModel.owners
112-
).join(DagModel, DagModel.dag_id == task_status_query.c.dag_id).order_by(task_status_query.c.dag_id).all()
121+
sql_res = (
122+
Session.query( # pylint: disable=no-member
123+
task_status_query.c.dag_id, task_status_query.c.task_id,
124+
task_status_query.c.state, task_status_query.c.cnt, DagModel.owners
125+
)
126+
.join(DagModel, DagModel.dag_id == task_status_query.c.dag_id)
127+
.join(SerializedDagModel, SerializedDagModel.dag_id == task_status_query.c.dag_id)
128+
.order_by(task_status_query.c.dag_id)
129+
.all()
130+
)
113131

114132
res = [
115133
TaskStatusInfo(
@@ -145,14 +163,16 @@ def get_dag_duration_info() -> List[DagDurationInfo]:
145163
}
146164
duration = durations.get(driver, durations['default'])
147165

148-
sql_res = Session.query( # pylint: disable=no-member
149-
DagRun.dag_id,
150-
func.max(duration).label('duration')
151-
).group_by(
152-
DagRun.dag_id
153-
).filter(
154-
DagRun.state == State.RUNNING
155-
).all()
166+
sql_res = (
167+
Session.query( # pylint: disable=no-member
168+
DagRun.dag_id,
169+
func.max(duration).label('duration')
170+
)
171+
.group_by(DagRun.dag_id)
172+
.filter(DagRun.state == State.RUNNING)
173+
.join(SerializedDagModel, SerializedDagModel.dag_id == DagRun.dag_id)
174+
.all()
175+
)
156176

157177
res = []
158178

@@ -325,4 +345,4 @@ class AirflowPrometheusPlugins(AirflowPlugin):
325345
flask_blueprints = [] # type: ignore
326346
menu_links = [] # type: ignore
327347
appbuilder_views = [RBACmetricsView]
328-
appbuilder_menu_items = [] # type: ignore
348+
appbuilder_menu_items = [] # type: ignore

0 commit comments

Comments
 (0)