Skip to content

Commit 6418df4

Browse files
committed
[dagster-airlift] move assets def construction to common code path; other refactors
1 parent 3c88083 commit 6418df4

File tree

2 files changed

+36
-35
lines changed

2 files changed

+36
-35
lines changed

python_modules/libraries/dagster-airlift/dagster_airlift/core/load_defs.py

+33-32
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from collections.abc import Iterable, Iterator, Sequence
1+
from collections.abc import Iterable, Sequence
22
from dataclasses import dataclass
33
from typing import Any, Callable, Optional, Union, cast
44

@@ -10,6 +10,7 @@
1010
)
1111
from dagster._annotations import beta
1212
from dagster._core.definitions.asset_key import AssetKey
13+
from dagster._core.definitions.asset_spec import map_asset_specs
1314
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
1415
from dagster._core.definitions.external_asset import external_asset_from_spec
1516
from dagster._core.definitions.sensor_definition import DefaultSensorStatus
@@ -63,11 +64,8 @@ def fetch_state(self) -> SerializedAirflowDefinitionsData:
6364
def defs_from_state( # pyright: ignore[reportIncompatibleMethodOverride]
6465
self, serialized_airflow_data: SerializedAirflowDefinitionsData
6566
) -> Definitions:
66-
return Definitions(
67-
assets=[
68-
*_apply_airflow_data_to_specs(self.mapped_assets, serialized_airflow_data),
69-
*construct_dag_assets_defs(serialized_airflow_data),
70-
]
67+
raise Exception(
68+
"We use get_or_fetch_state() to build definitions, and leave it up to the callsite how it is used."
7169
)
7270

7371

@@ -230,14 +228,20 @@ def only_include_dag(dag_info: DagInfo) -> bool:
230228
).get_or_fetch_state()
231229
assets_to_apply_airflow_data = [
232230
*mapped_assets,
233-
*construct_dataset_defs(serialized_airflow_data),
231+
*construct_dataset_specs(serialized_airflow_data),
234232
]
235233
mapped_and_constructed_assets = [
236234
*_apply_airflow_data_to_specs(assets_to_apply_airflow_data, serialized_airflow_data),
237235
*construct_dag_assets_defs(serialized_airflow_data),
238236
]
237+
fully_resolved_assets_definitions = [
238+
external_asset_from_spec(asset)
239+
if isinstance(asset, AssetSpec)
240+
else cast(AssetsDefinition, asset)
241+
for asset in mapped_and_constructed_assets
242+
]
239243
defs_with_airflow_assets = replace_assets_in_defs(
240-
defs=defs, assets=mapped_and_constructed_assets
244+
defs=defs, assets=fully_resolved_assets_definitions
241245
)
242246

243247
return Definitions.merge(
@@ -271,16 +275,15 @@ def _type_narrow_defs_assets(defs: Definitions) -> Sequence[MappedAsset]:
271275
def _apply_airflow_data_to_specs(
272276
assets: Sequence[MappedAsset],
273277
serialized_data: SerializedAirflowDefinitionsData,
274-
) -> Iterator[AssetsDefinition]:
275-
"""Apply asset spec transformations to the asset definitions."""
276-
for asset in assets:
277-
narrowed_asset = _type_check_asset(asset)
278-
assets_def = (
279-
narrowed_asset
280-
if isinstance(narrowed_asset, AssetsDefinition)
281-
else external_asset_from_spec(narrowed_asset)
282-
)
283-
yield assets_def.map_asset_specs(get_airflow_data_to_spec_mapper(serialized_data))
278+
) -> Sequence[MappedAsset]:
279+
"""Apply asset spec transformations to the assets."""
280+
return cast(
281+
Sequence[MappedAsset],
282+
map_asset_specs(
283+
func=get_airflow_data_to_spec_mapper(serialized_data),
284+
iterable=assets,
285+
),
286+
)
284287

285288

286289
def replace_assets_in_defs(
@@ -339,31 +342,29 @@ def uri_to_asset_key(uri: str) -> AssetKey:
339342
return AssetKey(with_ext_removed)
340343

341344

342-
def construct_dataset_defs(
345+
def construct_dataset_specs(
343346
serialized_data: SerializedAirflowDefinitionsData,
344-
) -> Sequence[AssetsDefinition]:
347+
) -> Sequence[AssetSpec]:
345348
"""Construct dataset definitions from the serialized Airflow data."""
346349
from dagster_airlift.core.multiple_tasks import assets_with_multiple_task_mappings
347350

348351
return cast(
349-
Sequence[AssetsDefinition],
352+
Sequence[AssetSpec],
350353
[
351354
assets_with_multiple_task_mappings(
352355
task_handles=[
353356
{"dag_id": t.dag_id, "task_id": t.task_id} for t in dataset.producing_tasks
354357
],
355358
assets=[
356-
external_asset_from_spec(
357-
AssetSpec(
358-
key=uri_to_asset_key(dataset.uri),
359-
metadata=dataset.extra,
360-
deps=[
361-
uri_to_asset_key(upstream_uri)
362-
for upstream_uri in serialized_data.upstream_datasets_by_uri.get(
363-
dataset.uri, set()
364-
)
365-
],
366-
)
359+
AssetSpec(
360+
key=uri_to_asset_key(dataset.uri),
361+
metadata=dataset.extra,
362+
deps=[
363+
uri_to_asset_key(upstream_uri)
364+
for upstream_uri in serialized_data.upstream_datasets_by_uri.get(
365+
dataset.uri, set()
366+
)
367+
],
367368
)
368369
],
369370
)[0]

python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_load_defs.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -673,10 +673,10 @@ def test_enrich() -> None:
673673
source_code_retrieval_enabled=None,
674674
)
675675
assert len(airflow_assets) == 1
676-
assets_def = next(iter(airflow_assets))
677-
assert assets_def.key == AssetKey("a")
676+
spec = next(iter(airflow_assets))
677+
assert isinstance(spec, AssetSpec)
678+
assert spec.key == AssetKey("a")
678679
# Asset metadata properties have been glommed onto the asset
679-
spec = next(iter(assets_def.specs))
680680
assert spec.metadata["Dag ID"] == "dag"
681681

682682

0 commit comments

Comments
 (0)