Skip to content

Commit 4534d04

Browse files
committed
[dagster-airlift] make AirflowDefinitionsData aware of jobs
1 parent 8a6c171 commit 4534d04

File tree

7 files changed

+133
-43
lines changed

7 files changed

+133
-43
lines changed

python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_defs_data.py

+69-14
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,16 @@
33
from functools import cached_property
44
from typing import AbstractSet, Union # noqa: UP035
55

6-
from dagster import AssetKey, AssetsDefinition, AssetSpec
6+
from dagster import (
7+
AssetKey,
8+
AssetsDefinition,
9+
AssetSpec,
10+
_check as check,
11+
)
712
from dagster._annotations import beta, public
13+
from dagster._core.definitions.definitions_class import Definitions
14+
from dagster._core.definitions.job_definition import JobDefinition
15+
from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition
816
from dagster._record import record
917

1018
from dagster_airlift.core.airflow_instance import AirflowInstance
@@ -15,6 +23,7 @@
1523
from dagster_airlift.core.serialization.serialized_data import DagHandle, TaskHandle
1624
from dagster_airlift.core.utils import (
1725
dag_handles_for_spec,
26+
get_producing_dag_ids,
1827
is_dag_mapped_asset_spec,
1928
is_peered_dag_asset_spec,
2029
is_task_mapped_asset_spec,
@@ -26,6 +35,14 @@
2635
MappedAsset = Union[AssetSpec, AssetsDefinition]
2736

2837

38+
def _is_mapped_asset(asset: MappedAsset) -> bool:
39+
return (
40+
is_task_mapped_asset_spec(asset)
41+
or is_dag_mapped_asset_spec(asset)
42+
or is_peered_dag_asset_spec(asset)
43+
)
44+
45+
2946
@beta
3047
@record
3148
class AirflowDefinitionsData:
@@ -37,25 +54,61 @@ class AirflowDefinitionsData:
3754
"""
3855

3956
airflow_instance: AirflowInstance
40-
airflow_mapped_assets: Sequence[MappedAsset]
57+
defs: Definitions
58+
59+
@property
60+
def airflow_mapped_asset_specs(self) -> Mapping[AssetKey, AssetSpec]:
61+
"""The assets that are mapped to Airflow tasks and dags."""
62+
result = {}
63+
for asset in self.defs.assets:
64+
if not isinstance(asset, (AssetsDefinition, AssetSpec)):
65+
continue
66+
for spec in spec_iterator([asset]):
67+
if _is_mapped_asset(spec):
68+
result[spec.key] = spec
69+
return result
70+
71+
@property
72+
def airflow_mapped_jobs(self) -> Sequence[Union[JobDefinition, UnresolvedAssetJobDefinition]]:
73+
"""Jobs mapping to Airflow dags."""
74+
return [job for job in self.defs.jobs if job.tags.get("dagster/external_job") == "airflow"]
75+
76+
@property
77+
def airflow_mapped_jobs_by_dag_handle(
78+
self,
79+
) -> Mapping[DagHandle, Union[JobDefinition, UnresolvedAssetJobDefinition]]:
80+
"""Jobs mapping to Airflow dags by dag_id."""
81+
return {
82+
DagHandle(dag_id=check.not_none(job.tags)["dagster-airlift/dag_id"]): job
83+
for job in self.airflow_mapped_jobs
84+
}
85+
86+
@property
87+
def assets_per_job(self) -> Mapping[str, AbstractSet[AssetKey]]:
88+
"""Assets per job mapping to Airflow dags."""
89+
return {
90+
job.name: self.assets_produced_by_dags[dag_handle.dag_id]
91+
for dag_handle, job in self.airflow_mapped_jobs_by_dag_handle.items()
92+
}
93+
94+
@property
95+
def assets_produced_by_dags(self) -> Mapping[str, AbstractSet[AssetKey]]:
96+
"""Assets produced by Airflow dags."""
97+
result = defaultdict(set)
98+
for spec in self.airflow_mapped_asset_specs.values():
99+
for dag_id in get_producing_dag_ids(spec):
100+
result[dag_id].add(spec.key)
101+
return result
41102

42103
@public
43104
@property
44105
def instance_name(self) -> str:
45106
"""The name of the Airflow instance."""
46107
return self.airflow_instance.name
47108

48-
@cached_property
49-
def all_asset_specs(self) -> Sequence[AssetSpec]:
50-
return list(spec_iterator(self.airflow_mapped_assets))
51-
52109
@cached_property
53110
def mapping_info(self) -> AirliftMetadataMappingInfo:
54-
return build_airlift_metadata_mapping_info(self.airflow_mapped_assets)
55-
56-
@cached_property
57-
def all_asset_specs_by_key(self) -> Mapping[AssetKey, AssetSpec]:
58-
return {spec.key: spec for spec in self.all_asset_specs}
111+
return build_airlift_metadata_mapping_info(self.airflow_mapped_asset_specs)
59112

60113
@public
61114
def task_ids_in_dag(self, dag_id: str) -> set[str]:
@@ -80,7 +133,7 @@ def dag_ids_with_mapped_asset_keys(self) -> AbstractSet[str]:
80133
@cached_property
81134
def mapped_asset_keys_by_task_handle(self) -> Mapping[TaskHandle, AbstractSet[AssetKey]]:
82135
asset_keys_per_handle = defaultdict(set)
83-
for spec in self.all_asset_specs:
136+
for spec in self.airflow_mapped_asset_specs.values():
84137
if is_task_mapped_asset_spec(spec):
85138
task_handles = task_handles_for_spec(spec)
86139
for task_handle in task_handles:
@@ -89,8 +142,9 @@ def mapped_asset_keys_by_task_handle(self) -> Mapping[TaskHandle, AbstractSet[As
89142

90143
@cached_property
91144
def mapped_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[AssetKey]]:
145+
"""Assets specifically mapped to each dag."""
92146
asset_keys_per_handle = defaultdict(set)
93-
for spec in self.all_asset_specs:
147+
for spec in self.airflow_mapped_asset_specs.values():
94148
if is_dag_mapped_asset_spec(spec):
95149
dag_handles = dag_handles_for_spec(spec)
96150
for dag_handle in dag_handles:
@@ -99,8 +153,9 @@ def mapped_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[Asse
99153

100154
@cached_property
101155
def peered_dag_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[AssetKey]]:
156+
"""Autogenerated "peered" dag assets."""
102157
asset_keys_per_handle = defaultdict(set)
103-
for spec in self.all_asset_specs:
158+
for spec in self.airflow_mapped_asset_specs.values():
104159
if is_peered_dag_asset_spec(spec):
105160
dag_handles = peered_dag_handles_for_spec(spec)
106161
for dag_handle in dag_handles:

python_modules/libraries/dagster-airlift/dagster_airlift/core/job_builder.py

+8-2
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ def dag_asset_job(
4949
return define_asset_job(
5050
name=convert_to_valid_dagster_name(dag_data.dag_id),
5151
metadata=dag_asset_metadata(dag_data.dag_info),
52-
tags={**airflow_kind_dict(), **{"dagster/external_job": "airflow"}},
52+
tags={
53+
**airflow_kind_dict(),
54+
**{"dagster/external_job": "airflow", "dagster-airlift/dag_id": dag_data.dag_id},
55+
},
5356
selection=[asset.key for asset in specs],
5457
)
5558

@@ -61,7 +64,10 @@ def dummy_op():
6164

6265
@job(
6366
name=convert_to_valid_dagster_name(dag_data.dag_id),
64-
tags={**airflow_kind_dict(), **{"dagster/external_job": "airflow"}},
67+
tags={
68+
**airflow_kind_dict(),
69+
**{"dagster/external_job": "airflow", "dagster-airlift/dag_id": dag_data.dag_id},
70+
},
6571
)
6672
def dummy_job():
6773
dummy_op()

python_modules/libraries/dagster-airlift/dagster_airlift/core/load_defs.py

+6-22
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
from collections import defaultdict
22
from collections.abc import Iterable, Mapping, Sequence
33
from dataclasses import dataclass
4-
from typing import Any, Callable, Optional, Union, cast
4+
from typing import Callable, Optional, Union, cast
55

6-
from dagster import (
7-
AssetsDefinition,
8-
AssetSpec,
9-
Definitions,
10-
_check as check,
11-
)
6+
from dagster import AssetsDefinition, AssetSpec, Definitions
127
from dagster._annotations import beta
138
from dagster._core.definitions.asset_key import AssetKey
149
from dagster._core.definitions.asset_spec import map_asset_specs
@@ -19,7 +14,6 @@
1914
from dagster._core.definitions.sensor_definition import DefaultSensorStatus
2015
from dagster._core.definitions.unresolved_asset_job_definition import UnresolvedAssetJobDefinition
2116

22-
from dagster_airlift.core.airflow_defs_data import MappedAsset
2317
from dagster_airlift.core.airflow_instance import AirflowInstance
2418
from dagster_airlift.core.filter import AirflowFilter
2519
from dagster_airlift.core.job_builder import construct_dag_jobs
@@ -41,12 +35,14 @@
4135
SerializedAirflowDefinitionsData,
4236
)
4337
from dagster_airlift.core.utils import (
38+
MappedAsset,
4439
dag_handles_for_spec,
4540
get_metadata_key,
4641
is_dag_mapped_asset_spec,
4742
is_task_mapped_asset_spec,
4843
spec_iterator,
4944
task_handles_for_spec,
45+
type_narrow_defs_assets,
5046
)
5147

5248

@@ -230,7 +226,7 @@ def only_include_dag(dag_info: DagInfo) -> bool:
230226
231227
"""
232228
defs = defs or Definitions()
233-
mapped_assets = _type_narrow_defs_assets(defs)
229+
mapped_assets = type_narrow_defs_assets(defs)
234230
serialized_airflow_data = AirflowInstanceDefsLoader(
235231
airflow_instance=airflow_instance,
236232
mapped_assets=mapped_assets,
@@ -272,18 +268,6 @@ def only_include_dag(dag_info: DagInfo) -> bool:
272268
)
273269

274270

275-
def _type_check_asset(asset: Any) -> MappedAsset:
276-
return check.inst(
277-
asset,
278-
(AssetSpec, AssetsDefinition),
279-
"Expected passed assets to all be AssetsDefinitions or AssetSpecs.",
280-
)
281-
282-
283-
def _type_narrow_defs_assets(defs: Definitions) -> Sequence[MappedAsset]:
284-
return [_type_check_asset(asset) for asset in defs.assets or []]
285-
286-
287271
def _apply_airflow_data_to_specs(
288272
assets: Sequence[MappedAsset],
289273
serialized_data: SerializedAirflowDefinitionsData,
@@ -456,7 +440,7 @@ def build_job_based_airflow_defs(
456440
mapped_defs: Optional[Definitions] = None,
457441
) -> Definitions:
458442
mapped_defs = mapped_defs or Definitions()
459-
mapped_assets = _type_narrow_defs_assets(mapped_defs)
443+
mapped_assets = type_narrow_defs_assets(mapped_defs)
460444
serialized_airflow_data = AirflowInstanceDefsLoader(
461445
airflow_instance=airflow_instance,
462446
mapped_assets=mapped_assets,

python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/event_translation.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def default_event_transformer(
4040
"""The default event transformer function, which attaches a partition key to materializations which are from time-window partitioned assets."""
4141
cached_partition_calculations = defaultdict(dict)
4242
for mat in materializations:
43-
asset_spec = airflow_data.all_asset_specs_by_key[mat.asset_key]
43+
asset_spec = airflow_data.airflow_mapped_asset_specs[mat.asset_key]
4444
if not asset_spec.partitions_def or not isinstance(
4545
asset_spec.partitions_def, TimeWindowPartitionsDefinition
4646
):

python_modules/libraries/dagster-airlift/dagster_airlift/core/sensor/sensor_builder.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from dagster._annotations import beta
1818
from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation
1919
from dagster._core.definitions.asset_selection import AssetSelection
20+
from dagster._core.definitions.definitions_class import Definitions
2021
from dagster._core.definitions.events import AssetObservation
2122
from dagster._core.definitions.repository_definition.repository_definition import (
2223
RepositoryDefinition,
@@ -110,7 +111,7 @@ def build_airflow_polling_sensor(
110111
Definitions: A `Definitions` object containing the constructed sensor.
111112
"""
112113
airflow_data = AirflowDefinitionsData(
113-
airflow_instance=airflow_instance, airflow_mapped_assets=mapped_assets
114+
airflow_instance=airflow_instance, defs=Definitions(assets=mapped_assets)
114115
)
115116

116117
@sensor(
@@ -416,7 +417,7 @@ def automapped_tasks_asset_keys(
416417
asset_keys_to_emit = set()
417418
asset_keys = airflow_data.asset_keys_in_task(dag_run.dag_id, task_instance.task_id)
418419
for asset_key in asset_keys:
419-
spec = airflow_data.all_asset_specs_by_key[asset_key]
420+
spec = airflow_data.airflow_mapped_asset_specs[asset_key]
420421
if spec.metadata.get(AUTOMAPPED_TASK_METADATA_KEY):
421422
asset_keys_to_emit.add(asset_key)
422423
return asset_keys_to_emit

python_modules/libraries/dagster-airlift/dagster_airlift/core/utils.py

+27-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
from collections.abc import Iterable, Iterator
2-
from typing import TYPE_CHECKING, Optional, Union
1+
from collections.abc import Iterable, Iterator, Sequence
2+
from typing import TYPE_CHECKING, Any, Optional, Union
33

44
from dagster import (
55
AssetsDefinition,
@@ -8,6 +8,7 @@
88
_check as check,
99
)
1010
from dagster._core.definitions.cacheable_assets import CacheableAssetsDefinition
11+
from dagster._core.definitions.definitions_class import Definitions
1112
from dagster._core.definitions.utils import VALID_NAME_REGEX
1213
from dagster._core.errors import DagsterInvariantViolationError
1314
from dagster._core.storage.tags import KIND_PREFIX
@@ -106,3 +107,27 @@ def peered_dag_handles_for_spec(spec: AssetSpec) -> set["DagHandle"]:
106107
DagHandle(dag_id=dag_handle_dict["dag_id"])
107108
for dag_handle_dict in spec.metadata[PEERED_DAG_MAPPING_METADATA_KEY]
108109
}
110+
111+
112+
def get_producing_dag_ids(spec: AssetSpec) -> set[str]:
113+
if is_dag_mapped_asset_spec(spec):
114+
return {dag_handle.dag_id for dag_handle in dag_handles_for_spec(spec)}
115+
if is_peered_dag_asset_spec(spec):
116+
return {dag_handle.dag_id for dag_handle in peered_dag_handles_for_spec(spec)}
117+
else:
118+
return {task_handle.dag_id for task_handle in task_handles_for_spec(spec)}
119+
120+
121+
MappedAsset = Union[AssetSpec, AssetsDefinition]
122+
123+
124+
def _type_check_asset(asset: Any) -> MappedAsset:
125+
return check.inst(
126+
asset,
127+
(AssetSpec, AssetsDefinition),
128+
"Expected passed assets to all be AssetsDefinitions or AssetSpecs.",
129+
)
130+
131+
132+
def type_narrow_defs_assets(defs: Definitions) -> Sequence[MappedAsset]:
133+
return [_type_check_asset(asset) for asset in defs.assets or []]

python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_load_defs.py

+19
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
)
4343
from dagster_airlift.core.serialization.defs_construction import make_default_dag_asset_key
4444
from dagster_airlift.core.serialization.serialized_data import (
45+
DagHandle,
4546
SerializedAirflowDefinitionsData,
4647
TaskHandle,
4748
)
@@ -844,3 +845,21 @@ def test_load_job_defs() -> None:
844845
assert isinstance(get_job("producer2", defs), UnresolvedAssetJobDefinition)
845846
assert isinstance(get_job("consumer1", defs), UnresolvedAssetJobDefinition)
846847
assert isinstance(get_job("consumer2", defs), JobDefinition)
848+
849+
airflow_defs_data = AirflowDefinitionsData(
850+
airflow_instance=af_instance,
851+
defs=defs,
852+
)
853+
854+
assert airflow_defs_data.airflow_mapped_jobs_by_dag_handle == {
855+
DagHandle(dag_id="producer1"): get_job("producer1", defs),
856+
DagHandle(dag_id="producer2"): get_job("producer2", defs),
857+
DagHandle(dag_id="consumer1"): get_job("consumer1", defs),
858+
DagHandle(dag_id="consumer2"): get_job("consumer2", defs),
859+
}
860+
assert airflow_defs_data.assets_per_job == {
861+
"producer1": {AssetKey("example1"), AssetKey("a")},
862+
"producer2": {AssetKey("example1")},
863+
"consumer1": {AssetKey("example2")},
864+
"consumer2": set(),
865+
}

0 commit comments

Comments
 (0)