Skip to content

Commit ca08dca

Browse files
fix: liveness probes in airflow 2.6.0 (#743)
* fix: scheduler liveness probe Signed-off-by: Mathew Wicks <[email protected]> * fix: triggerer liveness probe Signed-off-by: Mathew Wicks <[email protected]> --------- Signed-off-by: Mathew Wicks <[email protected]>
1 parent f5c2f0c commit ca08dca

File tree

2 files changed

+43
-9
lines changed

2 files changed

+43
-9
lines changed

charts/airflow/templates/scheduler/scheduler-deployment.yaml

+27-6
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,30 @@ spec:
140140
os.environ["AIRFLOW__LOGGING__LOGGING_LEVEL"] = "ERROR"
141141
{{- end }}
142142
143-
from airflow.jobs.scheduler_job import SchedulerJob
143+
# shared imports
144+
try:
145+
from airflow.jobs.job import Job
146+
except ImportError:
147+
# `BaseJob` was renamed to `Job` in airflow 2.6.0
148+
from airflow.jobs.base_job import BaseJob as Job
144149
from airflow.utils.db import create_session
145150
from airflow.utils.net import get_hostname
151+
152+
# heartbeat check imports
153+
try:
154+
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
155+
except ImportError:
156+
# `SchedulerJob` is wrapped by `SchedulerJobRunner` since airflow 2.6.0
157+
from airflow.jobs.scheduler_job import SchedulerJob as SchedulerJobRunner
158+
146159
{{- if .Values.scheduler.livenessProbe.taskCreationCheck.enabled }}
147-
from airflow.jobs.local_task_job import LocalTaskJob
160+
{{ "" }}
161+
# task creation check imports
162+
try:
163+
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
164+
except ImportError:
165+
# `LocalTaskJob` is wrapped by `LocalTaskJobRunner` since airflow 2.6.0
166+
from airflow.jobs.local_task_job import LocalTaskJob as LocalTaskJobRunner
148167
from airflow.utils import timezone
149168
{{- end }}
150169
@@ -155,9 +174,10 @@ spec:
155174
# ensure the SchedulerJob with most recent heartbeat for this `hostname` is alive
156175
hostname = get_hostname()
157176
scheduler_job = session \
158-
.query(SchedulerJob) \
177+
.query(Job) \
178+
.filter_by(job_type=SchedulerJobRunner.job_type) \
159179
.filter_by(hostname=hostname) \
160-
.order_by(SchedulerJob.latest_heartbeat.desc()) \
180+
.order_by(Job.latest_heartbeat.desc()) \
161181
.limit(1) \
162182
.first()
163183
if (scheduler_job is not None) and scheduler_job.is_alive():
@@ -183,8 +203,9 @@ spec:
183203
# ensure the most recent LocalTaskJob had a start_date in the last `task_job_threshold` seconds
184204
task_job_threshold = {{ $task_job_threshold }}
185205
task_job = session \
186-
.query(LocalTaskJob) \
187-
.order_by(LocalTaskJob.id.desc()) \
206+
.query(Job) \
207+
.filter_by(job_type=LocalTaskJobRunner.job_type) \
208+
.order_by(Job.id.desc()) \
188209
.limit(1) \
189210
.first()
190211
if task_job is not None:

charts/airflow/templates/triggerer/triggerer-deployment.yaml

+16-3
Original file line numberDiff line numberDiff line change
@@ -125,17 +125,30 @@ spec:
125125
# suppress logs triggered from importing airflow packages
126126
os.environ["AIRFLOW__LOGGING__LOGGING_LEVEL"] = "ERROR"
127127
128-
from airflow.jobs.triggerer_job import TriggererJob
128+
# shared imports
129+
try:
130+
from airflow.jobs.job import Job
131+
except ImportError:
132+
# `BaseJob` was renamed to `Job` in airflow 2.6.0
133+
from airflow.jobs.base_job import BaseJob as Job
129134
from airflow.utils.db import create_session
130135
from airflow.utils.net import get_hostname
131136
137+
# heartbeat check imports
138+
try:
139+
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
140+
except ImportError:
141+
# `TriggererJob` is wrapped by `TriggererJobRunner` since airflow 2.6.0
142+
from airflow.jobs.triggerer_job import TriggererJob as TriggererJobRunner
143+
132144
with create_session() as session:
133145
# ensure the TriggererJob with most recent heartbeat for this `hostname` is alive
134146
hostname = get_hostname()
135147
triggerer_job = session \
136-
.query(TriggererJob) \
148+
.query(Job) \
149+
.filter_by(job_type=TriggererJobRunner.job_type) \
137150
.filter_by(hostname=hostname) \
138-
.order_by(TriggererJob.latest_heartbeat.desc()) \
151+
.order_by(Job.latest_heartbeat.desc()) \
139152
.limit(1) \
140153
.first()
141154
if (triggerer_job is not None) and triggerer_job.is_alive():

0 commit comments

Comments
 (0)