Skip to content

Commit b38fa41

Browse files
committed
[dagster-airlift] make AirflowDefinitionsData aware of jobs
1 parent 8a6c171 commit b38fa41

File tree

9 files changed

+159
-55
lines changed

9 files changed

+159
-55
lines changed

python_modules/libraries/dagster-airlift/dagster_airlift/constants.py

+1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,6 @@
1212
DAG_RUN_ID_TAG_KEY = "dagster-airlift/airflow-dag-run-id"
1313
DAG_ID_TAG_KEY = "dagster-airlift/airflow-dag-id"
1414
TASK_ID_TAG_KEY = "dagster-airlift/airflow-task-id"
15+
EXTERNAL_JOB_TAG_KEY = "dagster/external-job"
1516

1617
SOURCE_CODE_METADATA_KEY = "Source Code"

python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_defs_data.py

+63-13
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55

66
from dagster import AssetKey, AssetsDefinition, AssetSpec
77
from dagster._annotations import beta, public
8+
from dagster._core.definitions.job_definition import JobDefinition
9+
from dagster._core.definitions.repository_definition.repository_definition import (
10+
RepositoryDefinition,
11+
)
12+
from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition
813
from dagster._record import record
914

1015
from dagster_airlift.core.airflow_instance import AirflowInstance
@@ -14,7 +19,10 @@
1419
)
1520
from dagster_airlift.core.serialization.serialized_data import DagHandle, TaskHandle
1621
from dagster_airlift.core.utils import (
22+
dag_handle_from_job,
1723
dag_handles_for_spec,
24+
get_producing_dag_ids,
25+
is_airflow_mapped_job,
1826
is_dag_mapped_asset_spec,
1927
is_peered_dag_asset_spec,
2028
is_task_mapped_asset_spec,
@@ -26,6 +34,14 @@
2634
MappedAsset = Union[AssetSpec, AssetsDefinition]
2735

2836

37+
def _is_mapped_asset(asset: MappedAsset) -> bool:
38+
return (
39+
is_task_mapped_asset_spec(asset)
40+
or is_dag_mapped_asset_spec(asset)
41+
or is_peered_dag_asset_spec(asset)
42+
)
43+
44+
2945
@beta
3046
@record
3147
class AirflowDefinitionsData:
@@ -37,25 +53,57 @@ class AirflowDefinitionsData:
3753
"""
3854

3955
airflow_instance: AirflowInstance
40-
airflow_mapped_assets: Sequence[MappedAsset]
56+
resolved_repository: RepositoryDefinition
57+
58+
@property
59+
def airflow_mapped_asset_specs(self) -> Mapping[AssetKey, AssetSpec]:
60+
"""The assets that are mapped to Airflow tasks and dags."""
61+
return {
62+
spec.key: spec
63+
for spec in spec_iterator(self.resolved_repository.assets_defs_by_key.values())
64+
if _is_mapped_asset(spec)
65+
}
66+
67+
@property
68+
def airflow_mapped_jobs(self) -> Sequence[JobDefinition]:
69+
"""Jobs mapping to Airflow dags."""
70+
return [
71+
job for job in self.resolved_repository.get_all_jobs() if is_airflow_mapped_job(job)
72+
]
73+
74+
@property
75+
def airflow_mapped_jobs_by_dag_handle(
76+
self,
77+
) -> Mapping[DagHandle, Union[JobDefinition, UnresolvedAssetJobDefinition]]:
78+
"""Jobs mapping to Airflow dags by dag_id."""
79+
return {dag_handle_from_job(job): job for job in self.airflow_mapped_jobs}
80+
81+
@property
82+
def assets_per_job(self) -> Mapping[str, AbstractSet[AssetKey]]:
83+
"""Assets per job mapping to Airflow dags."""
84+
return {
85+
job.name: self.assets_produced_by_dags[dag_handle.dag_id]
86+
for dag_handle, job in self.airflow_mapped_jobs_by_dag_handle.items()
87+
}
88+
89+
@property
90+
def assets_produced_by_dags(self) -> Mapping[str, AbstractSet[AssetKey]]:
91+
"""Assets produced by Airflow dags."""
92+
result = defaultdict(set)
93+
for spec in self.airflow_mapped_asset_specs.values():
94+
for dag_id in get_producing_dag_ids(spec):
95+
result[dag_id].add(spec.key)
96+
return result
4197

4298
@public
4399
@property
44100
def instance_name(self) -> str:
45101
"""The name of the Airflow instance."""
46102
return self.airflow_instance.name
47103

48-
@cached_property
49-
def all_asset_specs(self) -> Sequence[AssetSpec]:
50-
return list(spec_iterator(self.airflow_mapped_assets))
51-
52104
@cached_property
53105
def mapping_info(self) -> AirliftMetadataMappingInfo:
54-
return build_airlift_metadata_mapping_info(self.airflow_mapped_assets)
55-
56-
@cached_property
57-
def all_asset_specs_by_key(self) -> Mapping[AssetKey, AssetSpec]:
58-
return {spec.key: spec for spec in self.all_asset_specs}
106+
return build_airlift_metadata_mapping_info(self.airflow_mapped_asset_specs.values())
59107

60108
@public
61109
def task_ids_in_dag(self, dag_id: str) -> set[str]:
@@ -80,7 +128,7 @@ def dag_ids_with_mapped_asset_keys(self) -> AbstractSet[str]:
80128
@cached_property
81129
def mapped_asset_keys_by_task_handle(self) -> Mapping[TaskHandle, AbstractSet[AssetKey]]:
82130
asset_keys_per_handle = defaultdict(set)
83-
for spec in self.all_asset_specs:
131+
for spec in self.airflow_mapped_asset_specs.values():
84132
if is_task_mapped_asset_spec(spec):
85133
task_handles = task_handles_for_spec(spec)
86134
for task_handle in task_handles:
@@ -89,8 +137,9 @@ def mapped_asset_keys_by_task_handle(self) -> Mapping[TaskHandle, AbstractSet[As
89137

90138
@cached_property
91139
def mapped_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[AssetKey]]:
140+
"""Assets specifically mapped to each dag."""
92141
asset_keys_per_handle = defaultdict(set)
93-
for spec in self.all_asset_specs:
142+
for spec in self.airflow_mapped_asset_specs.values():
94143
if is_dag_mapped_asset_spec(spec):
95144
dag_handles = dag_handles_for_spec(spec)
96145
for dag_handle in dag_handles:
@@ -99,8 +148,9 @@ def mapped_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[Asse
99148

100149
@cached_property
101150
def peered_dag_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[AssetKey]]:
151+
"""Autogenerated "peered" dag assets."""
102152
asset_keys_per_handle = defaultdict(set)
103-
for spec in self.all_asset_specs:
153+
for spec in self.airflow_mapped_asset_specs.values():
104154
if is_peered_dag_asset_spec(spec):
105155
dag_handles = peered_dag_handles_for_spec(spec)
106156
for dag_handle in dag_handles:

python_modules/libraries/dagster-airlift/dagster_airlift/core/job_builder.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
SerializedAirflowDefinitionsData,
1717
SerializedDagData,
1818
)
19-
from dagster_airlift.core.utils import airflow_kind_dict, convert_to_valid_dagster_name
19+
from dagster_airlift.core.utils import airflow_job_tags, convert_to_valid_dagster_name
2020

2121

2222
def construct_dag_jobs(
@@ -49,7 +49,7 @@ def dag_asset_job(
4949
return define_asset_job(
5050
name=convert_to_valid_dagster_name(dag_data.dag_id),
5151
metadata=dag_asset_metadata(dag_data.dag_info),
52-
tags={**airflow_kind_dict(), **{"dagster/external_job": "airflow"}},
52+
tags=airflow_job_tags(dag_data.dag_id),
5353
selection=[asset.key for asset in specs],
5454
)
5555

@@ -61,7 +61,7 @@ def dummy_op():
6161

6262
@job(
6363
name=convert_to_valid_dagster_name(dag_data.dag_id),
64-
tags={**airflow_kind_dict(), **{"dagster/external_job": "airflow"}},
64+
tags=airflow_job_tags(dag_data.dag_id),
6565
)
6666
def dummy_job():
6767
dummy_op()

python_modules/libraries/dagster-airlift/dagster_airlift/core/load_defs.py

+6-22
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
from collections import defaultdict
22
from collections.abc import Iterable, Mapping, Sequence
33
from dataclasses import dataclass
4-
from typing import Any, Callable, Optional, Union, cast
4+
from typing import Callable, Optional, Union, cast
55

6-
from dagster import (
7-
AssetsDefinition,
8-
AssetSpec,
9-
Definitions,
10-
_check as check,
11-
)
6+
from dagster import AssetsDefinition, AssetSpec, Definitions
127
from dagster._annotations import beta
138
from dagster._core.definitions.asset_key import AssetKey
149
from dagster._core.definitions.asset_spec import map_asset_specs
@@ -19,7 +14,6 @@
1914
from dagster._core.definitions.sensor_definition import DefaultSensorStatus
2015
from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition
2116

22-
from dagster_airlift.core.airflow_defs_data import MappedAsset
2317
from dagster_airlift.core.airflow_instance import AirflowInstance
2418
from dagster_airlift.core.filter import AirflowFilter
2519
from dagster_airlift.core.job_builder import construct_dag_jobs
@@ -41,12 +35,14 @@
4135
SerializedAirflowDefinitionsData,
4236
)
4337
from dagster_airlift.core.utils import (
38+
MappedAsset,
4439
dag_handles_for_spec,
4540
get_metadata_key,
4641
is_dag_mapped_asset_spec,
4742
is_task_mapped_asset_spec,
4843
spec_iterator,
4944
task_handles_for_spec,
45+
type_narrow_defs_assets,
5046
)
5147

5248

@@ -230,7 +226,7 @@ def only_include_dag(dag_info: DagInfo) -> bool:
230226
231227
"""
232228
defs = defs or Definitions()
233-
mapped_assets = _type_narrow_defs_assets(defs)
229+
mapped_assets = type_narrow_defs_assets(defs)
234230
serialized_airflow_data = AirflowInstanceDefsLoader(
235231
airflow_instance=airflow_instance,
236232
mapped_assets=mapped_assets,
@@ -272,18 +268,6 @@ def only_include_dag(dag_info: DagInfo) -> bool:
272268
)
273269

274270

275-
def _type_check_asset(asset: Any) -> MappedAsset:
276-
return check.inst(
277-
asset,
278-
(AssetSpec, AssetsDefinition),
279-
"Expected passed assets to all be AssetsDefinitions or AssetSpecs.",
280-
)
281-
282-
283-
def _type_narrow_defs_assets(defs: Definitions) -> Sequence[MappedAsset]:
284-
return [_type_check_asset(asset) for asset in defs.assets or []]
285-
286-
287271
def _apply_airflow_data_to_specs(
288272
assets: Sequence[MappedAsset],
289273
serialized_data: SerializedAirflowDefinitionsData,
@@ -456,7 +440,7 @@ def build_job_based_airflow_defs(
456440
mapped_defs: Optional[Definitions] = None,
457441
) -> Definitions:
458442
mapped_defs = mapped_defs or Definitions()
459-
mapped_assets = _type_narrow_defs_assets(mapped_defs)
443+
mapped_assets = type_narrow_defs_assets(mapped_defs)
460444
serialized_airflow_data = AirflowInstanceDefsLoader(
461445
airflow_instance=airflow_instance,
462446
mapped_assets=mapped_assets,

python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/event_translation.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def default_event_transformer(
4040
"""The default event transformer function, which attaches a partition key to materializations which are from time-window partitioned assets."""
4141
cached_partition_calculations = defaultdict(dict)
4242
for mat in materializations:
43-
asset_spec = airflow_data.all_asset_specs_by_key[mat.asset_key]
43+
asset_spec = airflow_data.airflow_mapped_asset_specs[mat.asset_key]
4444
if not asset_spec.partitions_def or not isinstance(
4545
asset_spec.partitions_def, TimeWindowPartitionsDefinition
4646
):

python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -109,21 +109,19 @@ def build_airflow_polling_sensor(
109109
Returns:
110110
Definitions: A `Definitions` object containing the constructed sensor.
111111
"""
112-
airflow_data = AirflowDefinitionsData(
113-
airflow_instance=airflow_instance, airflow_mapped_assets=mapped_assets
114-
)
115112

116113
@sensor(
117-
name=f"{airflow_data.airflow_instance.name}__airflow_dag_status_sensor",
114+
name=f"{airflow_instance.name}__airflow_dag_status_sensor",
118115
minimum_interval_seconds=minimum_interval_seconds,
119116
default_status=default_sensor_status or DefaultSensorStatus.RUNNING,
120117
# This sensor will only ever execute asset checks and not asset materializations.
121118
asset_selection=AssetSelection.all_asset_checks(),
122119
)
123120
def airflow_dag_sensor(context: SensorEvaluationContext) -> SensorResult:
124121
"""Sensor to report materialization events for each asset as new runs come in."""
125-
context.log.info(
126-
f"************Running sensor for {airflow_data.airflow_instance.name}***********"
122+
context.log.info(f"************Running sensor for {airflow_instance.name}***********")
123+
airflow_data = AirflowDefinitionsData(
124+
airflow_instance=airflow_instance, resolved_repository=context.repository_def
127125
)
128126
try:
129127
cursor = (
@@ -416,7 +414,7 @@ def automapped_tasks_asset_keys(
416414
asset_keys_to_emit = set()
417415
asset_keys = airflow_data.asset_keys_in_task(dag_run.dag_id, task_instance.task_id)
418416
for asset_key in asset_keys:
419-
spec = airflow_data.all_asset_specs_by_key[asset_key]
417+
spec = airflow_data.airflow_mapped_asset_specs[asset_key]
420418
if spec.metadata.get(AUTOMAPPED_TASK_METADATA_KEY):
421419
asset_keys_to_emit.add(asset_key)
422420
return asset_keys_to_emit

python_modules/libraries/dagster-airlift/dagster_airlift/core/utils.py

+52-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from collections.abc import Iterable, Iterator
2-
from typing import TYPE_CHECKING, Optional, Union
1+
from collections.abc import Iterable, Iterator, Mapping, Sequence
2+
from typing import TYPE_CHECKING, Any, Optional, Union
33

44
from dagster import (
55
AssetsDefinition,
@@ -8,13 +8,17 @@
88
_check as check,
99
)
1010
from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition
11+
from dagster._core.definitions.definitions_class import Definitions
12+
from dagster._core.definitions.job_definition import JobDefinition
1113
from dagster._core.definitions.utils import VALID_NAME_REGEX
1214
from dagster._core.errors import DagsterInvariantViolationError
1315
from dagster._core.storage.tags import KIND_PREFIX
1416

1517
from dagster_airlift.constants import (
1618
AIRFLOW_SOURCE_METADATA_KEY_PREFIX,
19+
DAG_ID_TAG_KEY,
1720
DAG_MAPPING_METADATA_KEY,
21+
EXTERNAL_JOB_TAG_KEY,
1822
PEERED_DAG_MAPPING_METADATA_KEY,
1923
TASK_MAPPING_METADATA_KEY,
2024
)
@@ -106,3 +110,49 @@ def peered_dag_handles_for_spec(spec: AssetSpec) -> set["DagHandle"]:
106110
DagHandle(dag_id=dag_handle_dict["dag_id"])
107111
for dag_handle_dict in spec.metadata[PEERED_DAG_MAPPING_METADATA_KEY]
108112
}
113+
114+
115+
def get_producing_dag_ids(spec: AssetSpec) -> set[str]:
116+
if is_dag_mapped_asset_spec(spec):
117+
return {dag_handle.dag_id for dag_handle in dag_handles_for_spec(spec)}
118+
if is_peered_dag_asset_spec(spec):
119+
return {dag_handle.dag_id for dag_handle in peered_dag_handles_for_spec(spec)}
120+
else:
121+
return {task_handle.dag_id for task_handle in task_handles_for_spec(spec)}
122+
123+
124+
MappedAsset = Union[AssetSpec, AssetsDefinition]
125+
126+
127+
def _type_check_asset(asset: Any) -> MappedAsset:
128+
return check.inst(
129+
asset,
130+
(AssetSpec, AssetsDefinition),
131+
"Expected passed assets to all be AssetsDefinitions or AssetSpecs.",
132+
)
133+
134+
135+
def type_narrow_defs_assets(defs: Definitions) -> Sequence[MappedAsset]:
136+
return [_type_check_asset(asset) for asset in defs.assets or []]
137+
138+
139+
def is_airflow_mapped_job(job: JobDefinition) -> bool:
140+
return job.tags.get(EXTERNAL_JOB_TAG_KEY) == "airflow"
141+
142+
143+
def dag_handle_from_job(job: JobDefinition) -> "DagHandle":
144+
from dagster_airlift.core.serialization.serialized_data import DagHandle
145+
146+
check.invariant(
147+
is_airflow_mapped_job(job),
148+
"Job is not an Airflow mapped job. Cannot get dag_id.",
149+
)
150+
return DagHandle(dag_id=job.tags[DAG_ID_TAG_KEY])
151+
152+
153+
def airflow_job_tags(dag_id: str) -> Mapping[str, str]:
154+
return {
155+
**airflow_kind_dict(),
156+
EXTERNAL_JOB_TAG_KEY: "airflow",
157+
DAG_ID_TAG_KEY: dag_id,
158+
}

python_modules/libraries/dagster-airlift/dagster_airlift/test/test_utils.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ def asset_spec(asset_str: str, defs: Definitions) -> Optional[AssetSpec]:
6060
)
6161

6262

63-
def get_job(
63+
def get_job_from_defs(
6464
name: str, defs: Definitions
6565
) -> Optional[Union[JobDefinition, UnresolvedAssetJobDefinition]]:
6666
"""Get the job from the definitions by its name."""

0 commit comments

Comments
 (0)