Skip to content

Commit 2718dc7

Browse files
jedcunninghamdauinh
authored andcommitted
Remove dag processor from the scheduler (apache#45729)
We will only support a standalone DAG processor. Doing so removes complexity from the scheduler, and offers better isolation between these two different workloads.
1 parent f35887a commit 2718dc7

File tree

27 files changed

+303
-1258
lines changed

27 files changed

+303
-1258
lines changed

airflow/api/common/airflow_health.py

+17-22
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
from typing import Any
2020

21-
from airflow.configuration import conf
2221
from airflow.jobs.dag_processor_job_runner import DagProcessorJobRunner
2322
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
2423
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
@@ -29,13 +28,14 @@
2928

3029
def get_airflow_health() -> dict[str, Any]:
3130
"""Get the health for Airflow metadatabase, scheduler and triggerer."""
32-
dag_processor_enabled = conf.getboolean("scheduler", "standalone_dag_processor")
3331
metadatabase_status = HEALTHY
3432
latest_scheduler_heartbeat = None
3533
latest_triggerer_heartbeat = None
34+
latest_dag_processor_heartbeat = None
3635

3736
scheduler_status = UNHEALTHY
3837
triggerer_status: str | None = UNHEALTHY
38+
dag_processor_status: str | None = UNHEALTHY
3939

4040
try:
4141
latest_scheduler_job = SchedulerJobRunner.most_recent_job()
@@ -59,6 +59,18 @@ def get_airflow_health() -> dict[str, Any]:
5959
except Exception:
6060
metadatabase_status = UNHEALTHY
6161

62+
try:
63+
latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()
64+
65+
if latest_dag_processor_job:
66+
latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat()
67+
if latest_dag_processor_job.is_alive():
68+
dag_processor_status = HEALTHY
69+
else:
70+
dag_processor_status = None
71+
except Exception:
72+
metadatabase_status = UNHEALTHY
73+
6274
airflow_health_status = {
6375
"metadatabase": {"status": metadatabase_status},
6476
"scheduler": {
@@ -69,27 +81,10 @@ def get_airflow_health() -> dict[str, Any]:
6981
"status": triggerer_status,
7082
"latest_triggerer_heartbeat": latest_triggerer_heartbeat,
7183
},
72-
}
73-
74-
if dag_processor_enabled:
75-
latest_dag_processor_heartbeat = None
76-
dag_processor_status: str | None = UNHEALTHY
77-
78-
try:
79-
latest_dag_processor_job = DagProcessorJobRunner.most_recent_job()
80-
81-
if latest_dag_processor_job:
82-
latest_dag_processor_heartbeat = latest_dag_processor_job.latest_heartbeat.isoformat()
83-
if latest_dag_processor_job.is_alive():
84-
dag_processor_status = HEALTHY
85-
else:
86-
dag_processor_status = None
87-
except Exception:
88-
metadatabase_status = UNHEALTHY
89-
90-
airflow_health_status["dag_processor"] = {
84+
"dag_processor": {
9185
"status": dag_processor_status,
9286
"latest_dag_processor_heartbeat": latest_dag_processor_heartbeat,
93-
}
87+
},
88+
}
9489

9590
return airflow_health_status

airflow/auth/managers/simple/views/auth.py

-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ def login(self):
4848
"""Start login process."""
4949
state_color_mapping = State.state_color.copy()
5050
state_color_mapping["no_status"] = state_color_mapping.pop(None)
51-
standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
5251
return self.render_template(
5352
"airflow/login.html",
5453
disable_nav_bar=True,
@@ -57,7 +56,6 @@ def login(self):
5756
),
5857
auto_refresh_interval=conf.getint("webserver", "auto_refresh_interval"),
5958
state_color_mapping=state_color_mapping,
60-
standalone_dag_processor=standalone_dag_processor,
6159
)
6260

6361
@expose("/logout", methods=["GET", "POST"])

airflow/cli/cli_config.py

-1
Original file line numberDiff line numberDiff line change
@@ -1902,7 +1902,6 @@ class GroupCommand(NamedTuple):
19021902
help="Start a scheduler instance",
19031903
func=lazy_load_command("airflow.cli.commands.local_commands.scheduler_command.scheduler"),
19041904
args=(
1905-
ARG_SUBDIR,
19061905
ARG_NUM_RUNS,
19071906
ARG_PID,
19081907
ARG_DAEMON,

airflow/cli/commands/local_commands/dag_processor_command.py

-3
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,6 @@ def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
4848
@providers_configuration_loaded
4949
def dag_processor(args):
5050
"""Start Airflow Dag Processor Job."""
51-
if not conf.getboolean("scheduler", "standalone_dag_processor"):
52-
raise SystemExit("The option [scheduler/standalone_dag_processor] must be True.")
53-
5451
job_runner = _create_dag_processor_job_runner(args)
5552

5653
reload_configuration_for_dag_processing()

airflow/cli/commands/local_commands/scheduler_command.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
from airflow.jobs.job import Job, run_job
3131
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
3232
from airflow.utils import cli as cli_utils
33-
from airflow.utils.cli import process_subdir
3433
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
3534
from airflow.utils.scheduler_health import serve_health_check
3635
from airflow.utils.usage_data_collection import usage_data_collection
@@ -39,7 +38,7 @@
3938

4039

4140
def _run_scheduler_job(args) -> None:
42-
job_runner = SchedulerJobRunner(job=Job(), subdir=process_subdir(args.subdir), num_runs=args.num_runs)
41+
job_runner = SchedulerJobRunner(job=Job(), num_runs=args.num_runs)
4342
enable_health_check = conf.getboolean("scheduler", "ENABLE_HEALTH_CHECK")
4443
with _serve_logs(args.skip_serve_logs), _serve_health_check(enable_health_check):
4544
run_job(job=job_runner.job, execute_callable=job_runner._execute)

airflow/cli/commands/local_commands/standalone_command.py

-1
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,6 @@ def calculate_env(self):
177177
We override some settings as part of being standalone.
178178
"""
179179
env = dict(os.environ)
180-
env["AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR"] = "True"
181180

182181
# Make sure we're using a local executor flavour
183182
executor_class, _ = ExecutorLoader.import_default_executor_cls()

airflow/config_templates/config.yml

+1-11
Original file line numberDiff line numberDiff line change
@@ -2447,25 +2447,15 @@ scheduler:
24472447
type: string
24482448
example: ~
24492449
default: "modified_time"
2450-
standalone_dag_processor:
2451-
description: |
2452-
Whether the dag processor is running as a standalone process or it is a subprocess of a scheduler
2453-
job.
2454-
version_added: 2.3.0
2455-
type: boolean
2456-
example: ~
2457-
default: "False"
24582450
max_callbacks_per_loop:
24592451
description: |
2460-
Only applicable if ``[scheduler] standalone_dag_processor`` is true and callbacks are stored
2461-
in database. Contains maximum number of callbacks that are fetched during a single loop.
2452+
The maximum number of callbacks that are fetched during a single loop.
24622453
version_added: 2.3.0
24632454
type: integer
24642455
example: ~
24652456
default: "20"
24662457
dag_stale_not_seen_duration:
24672458
description: |
2468-
Only applicable if ``[scheduler] standalone_dag_processor`` is true.
24692459
Time in seconds after which dags, which were not updated by Dag Processor are deactivated.
24702460
version_added: 2.4.0
24712461
type: integer

0 commit comments

Comments
 (0)