Skip to content

Commit ebeb4d2

Browse files
committed
[dagster-airlift][jobs 5/n] Airflow monitoring job
1 parent 6595ca7 commit ebeb4d2

File tree

17 files changed

+977
-95
lines changed

17 files changed

+977
-95
lines changed

python_modules/dagster/dagster/_core/storage/tags.py

+1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@
123123
AUTO_RETRY_RUN_ID_TAG,
124124
*BACKFILL_TAGS,
125125
}
126+
EXTERNAL_JOB_SOURCE_TAG_KEY = f"{SYSTEM_TAG_PREFIX}/external-job-source"
126127

127128

128129
class TagType(Enum):

python_modules/libraries/dagster-airlift/dagster_airlift/constants.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
DAG_RUN_ID_TAG_KEY = "dagster-airlift/airflow-dag-run-id"
1313
DAG_ID_TAG_KEY = "dagster-airlift/airflow-dag-id"
1414
TASK_ID_TAG_KEY = "dagster-airlift/airflow-task-id"
15-
EXTERNAL_JOB_TAG_KEY = "dagster/external-job"
1615

1716
SOURCE_CODE_METADATA_KEY = "Source Code"
17+
18+
NO_STEP_KEY = "__no_step__"

python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_defs_data.py

+12-2
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from dagster._core.definitions.repository_definition.repository_definition import (
1010
RepositoryDefinition,
1111
)
12-
from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition
1312
from dagster._record import record
1413

1514
from dagster_airlift.core.airflow_instance import AirflowInstance
@@ -74,7 +73,7 @@ def airflow_mapped_jobs(self) -> Sequence[JobDefinition]:
7473
@property
7574
def airflow_mapped_jobs_by_dag_handle(
7675
self,
77-
) -> Mapping[DagHandle, Union[JobDefinition, UnresolvedAssetJobDefinition]]:
76+
) -> Mapping[DagHandle, JobDefinition]:
7877
"""Jobs mapping to Airflow dags by dag_id."""
7978
return {dag_handle_from_job(job): job for job in self.airflow_mapped_jobs}
8079

@@ -135,6 +134,7 @@ def mapped_asset_keys_by_task_handle(self) -> Mapping[TaskHandle, AbstractSet[As
135134
asset_keys_per_handle[task_handle].add(spec.key)
136135
return asset_keys_per_handle
137136

137+
# these dag handle properties are ripe for consolidation
138138
@cached_property
139139
def mapped_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[AssetKey]]:
140140
"""Assets specifically mapped to each dag."""
@@ -157,6 +157,16 @@ def peered_dag_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[
157157
asset_keys_per_handle[dag_handle].add(spec.key)
158158
return asset_keys_per_handle
159159

160+
@cached_property
161+
def all_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[AssetKey]]:
162+
"""All asset keys mapped to each dag."""
163+
res = defaultdict(set)
164+
for handle, keys in self.mapped_asset_keys_by_dag_handle.items():
165+
res[handle].update(keys)
166+
for handle, keys in self.peered_dag_asset_keys_by_dag_handle.items():
167+
res[handle].update(keys)
168+
return res
169+
160170
@public
161171
def asset_keys_in_task(self, dag_id: str, task_id: str) -> AbstractSet[AssetKey]:
162172
"""Returns the asset keys that are mapped to the given task.

python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_instance.py

+59-11
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,46 @@ def list_variables(self) -> list[dict[str, Any]]:
128128
"Failed to fetch variables. Status code: {response.status_code}, Message: {response.text}"
129129
)
130130

131+
def get_task_instance_batch_time_range(
132+
self,
133+
dag_ids: Sequence[str],
134+
states: Sequence[str],
135+
end_date_gte: datetime.datetime,
136+
end_date_lte: datetime.datetime,
137+
) -> list["TaskInstance"]:
138+
"""Get all task instances across all dag_ids for a given time range."""
139+
response = self.auth_backend.get_session().post(
140+
f"{self.get_api_url()}/dags/~/dagRuns/~/taskInstances/list",
141+
json={
142+
"dag_ids": dag_ids,
143+
"end_date_gte": end_date_gte.isoformat(),
144+
"end_date_lte": end_date_lte.isoformat(),
145+
# Airflow's API refers to this variable in the singular, but it's actually a list. We keep the confusion contained to this one function.
146+
"state": states,
147+
},
148+
)
149+
150+
if response.status_code != 200:
151+
raise DagsterError(
152+
f"Failed to fetch task instances for {dag_ids}. Status code: {response.status_code}, Message: {response.text}"
153+
)
154+
return [
155+
TaskInstance(
156+
webserver_url=self.auth_backend.get_webserver_url(),
157+
dag_id=task_instance_json["dag_id"],
158+
task_id=task_instance_json["task_id"],
159+
run_id=task_instance_json["dag_run_id"],
160+
metadata=task_instance_json,
161+
)
162+
for task_instance_json in response.json()["task_instances"]
163+
]
164+
131165
def get_task_instance_batch(
132166
self, dag_id: str, task_ids: Sequence[str], run_id: str, states: Sequence[str]
133167
) -> list["TaskInstance"]:
134168
"""Get all task instances for a given dag_id, task_ids, and run_id."""
169+
# It's not possible to offset the task instance API on versions of Airflow < 2.7.0, so we need to
170+
# chunk the task ids directly.
135171
task_instances = []
136172
task_id_chunks = [
137173
task_ids[i : i + self.batch_task_instance_limit]
@@ -259,25 +295,37 @@ def get_dag_runs(
259295
def get_dag_runs_batch(
260296
self,
261297
dag_ids: Sequence[str],
262-
end_date_gte: datetime.datetime,
263-
end_date_lte: datetime.datetime,
298+
end_date_gte: Optional[datetime.datetime] = None,
299+
end_date_lte: Optional[datetime.datetime] = None,
300+
start_date_gte: Optional[datetime.datetime] = None,
301+
start_date_lte: Optional[datetime.datetime] = None,
264302
offset: int = 0,
303+
states: Optional[Sequence[str]] = None,
265304
) -> tuple[list["DagRun"], int]:
266305
"""For the given list of dag_ids, return a tuple containing:
267306
- A list of dag runs ending within (end_date_gte, end_date_lte). Returns a maximum of batch_dag_runs_limit (which is configurable on the instance).
268307
- The number of total rows returned.
269308
"""
309+
states = states or ["success"]
310+
params = {
311+
"dag_ids": list(dag_ids),
312+
"order_by": "end_date",
313+
"states": states,
314+
"page_offset": offset,
315+
"page_limit": self.batch_dag_runs_limit,
316+
}
317+
if end_date_gte:
318+
params["end_date_gte"] = end_date_gte.isoformat()
319+
if end_date_lte:
320+
params["end_date_lte"] = end_date_lte.isoformat()
321+
if start_date_gte:
322+
params["start_date_gte"] = start_date_gte.isoformat()
323+
if start_date_lte:
324+
params["start_date_lte"] = start_date_lte.isoformat()
325+
270326
response = self.auth_backend.get_session().post(
271327
f"{self.get_api_url()}/dags/~/dagRuns/list",
272-
json={
273-
"dag_ids": dag_ids,
274-
"end_date_gte": end_date_gte.isoformat(),
275-
"end_date_lte": end_date_lte.isoformat(),
276-
"order_by": "end_date",
277-
"states": ["success"],
278-
"page_offset": offset,
279-
"page_limit": self.batch_dag_runs_limit,
280-
},
328+
json=params,
281329
)
282330
if response.status_code == 200:
283331
webserver_url = self.auth_backend.get_webserver_url()

python_modules/libraries/dagster-airlift/dagster_airlift/core/job_builder.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,18 @@ def dag_asset_job(
4646
# Eventually we'll have to handle fully resolved AssetsDefinition objects here but it's a whole
4747
# can of worms. For now, we enforce that only assetSpec objects are passed in.
4848
return define_asset_job(
49-
name=convert_to_valid_dagster_name(dag_data.dag_id),
49+
name=job_name(dag_data.dag_id),
5050
metadata=dag_asset_metadata(dag_data.dag_info),
5151
tags=airflow_job_tags(dag_data.dag_id),
5252
selection=[asset.key for asset in specs],
5353
)
5454

5555

56+
def job_name(dag_id: str) -> str:
57+
"""Constructs a job name from the DAG ID. The job name is used to power runs."""
58+
return convert_to_valid_dagster_name(dag_id)
59+
60+
5661
def dag_non_asset_job(dag_data: SerializedDagData) -> JobDefinition:
5762
@job(
5863
name=convert_to_valid_dagster_name(dag_data.dag_id),

python_modules/libraries/dagster-airlift/dagster_airlift/core/load_defs.py

+3-16
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from dagster_airlift.core.airflow_instance import AirflowInstance
1616
from dagster_airlift.core.filter import AirflowFilter
1717
from dagster_airlift.core.job_builder import construct_dag_jobs
18+
from dagster_airlift.core.monitoring_job.builder import build_airflow_monitoring_defs
1819
from dagster_airlift.core.sensor.event_translation import (
1920
DagsterEventTransformerFn,
2021
default_event_transformer,
@@ -428,22 +429,8 @@ def build_job_based_airflow_defs(
428429
instance_name=airflow_instance.name,
429430
)
430431

431-
defs_with_airflow_assets = Definitions.merge(
432+
return Definitions.merge(
432433
replace_assets_in_defs(defs=mapped_defs, assets=[full_assets_def]),
433434
Definitions(jobs=jobs),
434-
)
435-
436-
return Definitions.merge(
437-
defs_with_airflow_assets,
438-
Definitions(
439-
sensors=[
440-
build_airflow_polling_sensor(
441-
mapped_assets=[full_assets_def],
442-
airflow_instance=airflow_instance,
443-
minimum_interval_seconds=DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS,
444-
event_transformer_fn=default_event_transformer,
445-
default_sensor_status=DefaultSensorStatus.RUNNING,
446-
)
447-
]
448-
),
435+
build_airflow_monitoring_defs(airflow_instance=airflow_instance),
449436
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
from typing import Union
2+
3+
from dagster import RunRequest
4+
from dagster._annotations import beta
5+
from dagster._core.definitions.decorators.job_decorator import job
6+
from dagster._core.definitions.decorators.op_decorator import op
7+
from dagster._core.definitions.decorators.schedule_decorator import schedule
8+
from dagster._core.definitions.definitions_class import Definitions
9+
from dagster._core.definitions.op_definition import OpDefinition
10+
from dagster._core.definitions.run_request import SkipReason
11+
from dagster._core.definitions.schedule_definition import (
12+
DefaultScheduleStatus,
13+
ScheduleEvaluationContext,
14+
)
15+
from dagster._core.execution.context.op_execution_context import OpExecutionContext
16+
from dagster._core.storage.dagster_run import RunsFilter
17+
from dagster._grpc.client import DEFAULT_SENSOR_GRPC_TIMEOUT
18+
from dagster._time import datetime_from_timestamp, get_current_datetime
19+
from dagster_airlift.core.airflow_defs_data import AirflowDefinitionsData
20+
from dagster_airlift.core.airflow_instance import AirflowInstance
21+
from dagster_airlift.core.monitoring_job.event_stream import persist_events
22+
from dagster_airlift.core.monitoring_job.utils import (
23+
augment_monitor_run_with_range_tags,
24+
get_range_from_run_history,
25+
structured_log,
26+
)
27+
from dagster_airlift.core.utils import monitoring_job_name
28+
29+
MAIN_LOOP_TIMEOUT_SECONDS = DEFAULT_SENSOR_GRPC_TIMEOUT - 20
30+
DEFAULT_AIRFLOW_SENSOR_INTERVAL_SECONDS = 30
31+
START_LOOKBACK_SECONDS = 60 # Lookback one minute in time for the initial setting of the cursor.
32+
33+
34+
# IMPROVEME BCOR-102: We should be able to replace the sensor from the original Airlift functionality with this job.
35+
@beta
36+
def build_airflow_monitoring_defs(
37+
*,
38+
airflow_instance: AirflowInstance,
39+
) -> Definitions:
40+
"""The constructed job polls the Airflow instance for activity, and inserts asset events into Dagster's event log."""
41+
42+
@job(name=monitoring_job_name(airflow_instance.name))
43+
def airflow_monitoring_job():
44+
_build_monitoring_op(airflow_instance)()
45+
46+
@schedule(
47+
job=airflow_monitoring_job,
48+
cron_schedule="* * * * *",
49+
name=f"{airflow_instance.name}__airflow_monitoring_job_schedule",
50+
default_status=DefaultScheduleStatus.RUNNING,
51+
)
52+
def airflow_monitoring_job_schedule(
53+
context: ScheduleEvaluationContext,
54+
) -> Union[RunRequest, SkipReason]:
55+
"""The schedule that runs the sensor job."""
56+
# Get the last run for this job
57+
last_run = next(
58+
iter(
59+
context.instance.get_runs(
60+
filters=RunsFilter(job_name=airflow_monitoring_job.name),
61+
limit=1,
62+
)
63+
),
64+
None,
65+
)
66+
if not last_run or last_run.is_finished:
67+
return RunRequest()
68+
else:
69+
return SkipReason("Monitoring job is already running.")
70+
71+
return Definitions(
72+
jobs=[airflow_monitoring_job],
73+
schedules=[airflow_monitoring_job_schedule],
74+
)
75+
76+
77+
def _build_monitoring_op(
78+
airflow_instance: AirflowInstance,
79+
) -> OpDefinition:
80+
@op(
81+
name=monitoring_job_op_name(airflow_instance),
82+
)
83+
def monitor_dags(context: OpExecutionContext) -> None:
84+
"""The main function that runs the sensor. It polls the Airflow instance for activity and emits asset events."""
85+
# This is a hack to get the repository tag for the current run. It's bad because it assumes that the job we're
86+
# creating a run for is within the same repository; but I think that we'll have to do a second pass to get "outside of code
87+
# location" runs working (if that's even something we want to do).
88+
airflow_data = AirflowDefinitionsData(
89+
airflow_instance=airflow_instance, resolved_repository=context.repository_def
90+
)
91+
# get previously processed time range from run tags
92+
current_date = get_current_datetime()
93+
range_start, range_end = get_range_from_run_history(context, current_date.timestamp())
94+
augment_monitor_run_with_range_tags(context, range_start, range_end)
95+
96+
structured_log(
97+
context,
98+
f"Processing from {datetime_from_timestamp(range_start)} to {datetime_from_timestamp(range_end)}",
99+
)
100+
persist_events(context, airflow_data, airflow_instance, range_start, range_end)
101+
102+
return monitor_dags
103+
104+
105+
def monitoring_job_op_name(airflow_instance: AirflowInstance) -> str:
106+
return f"core_monitor__{airflow_instance.name}"

0 commit comments

Comments
 (0)