|
1 | | -from collections.abc import Iterable, Iterator, Sequence |
| 1 | +from collections.abc import Iterable, Sequence |
2 | 2 | from dataclasses import dataclass |
3 | 3 | from typing import Any, Callable, Optional, Union, cast |
4 | 4 |
|
|
10 | 10 | ) |
11 | 11 | from dagster._annotations import beta |
12 | 12 | from dagster._core.definitions.asset_key import AssetKey |
| 13 | +from dagster._core.definitions.asset_spec import map_asset_specs |
13 | 14 | from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader |
14 | 15 | from dagster._core.definitions.external_asset import external_asset_from_spec |
15 | 16 | from dagster._core.definitions.sensor_definition import DefaultSensorStatus |
@@ -63,11 +64,8 @@ def fetch_state(self) -> SerializedAirflowDefinitionsData: |
63 | 64 | def defs_from_state( # pyright: ignore[reportIncompatibleMethodOverride] |
64 | 65 | self, serialized_airflow_data: SerializedAirflowDefinitionsData |
65 | 66 | ) -> Definitions: |
66 | | - return Definitions( |
67 | | - assets=[ |
68 | | - *_apply_airflow_data_to_specs(self.mapped_assets, serialized_airflow_data), |
69 | | - *construct_dag_assets_defs(serialized_airflow_data), |
70 | | - ] |
| 67 | + raise Exception( |
| 68 | + "We use get_or_fetch_state() to build definitions, and leave it up to the callsite how it is used." |
71 | 69 | ) |
72 | 70 |
|
73 | 71 |
|
@@ -230,14 +228,20 @@ def only_include_dag(dag_info: DagInfo) -> bool: |
230 | 228 | ).get_or_fetch_state() |
231 | 229 | assets_to_apply_airflow_data = [ |
232 | 230 | *mapped_assets, |
233 | | - *construct_dataset_defs(serialized_airflow_data), |
| 231 | + *construct_dataset_specs(serialized_airflow_data), |
234 | 232 | ] |
235 | 233 | mapped_and_constructed_assets = [ |
236 | 234 | *_apply_airflow_data_to_specs(assets_to_apply_airflow_data, serialized_airflow_data), |
237 | 235 | *construct_dag_assets_defs(serialized_airflow_data), |
238 | 236 | ] |
| 237 | + fully_resolved_assets_definitions = [ |
| 238 | + external_asset_from_spec(asset) |
| 239 | + if isinstance(asset, AssetSpec) |
| 240 | + else cast("AssetsDefinition", asset) |
| 241 | + for asset in mapped_and_constructed_assets |
| 242 | + ] |
239 | 243 | defs_with_airflow_assets = replace_assets_in_defs( |
240 | | - defs=defs, assets=mapped_and_constructed_assets |
| 244 | + defs=defs, assets=fully_resolved_assets_definitions |
241 | 245 | ) |
242 | 246 |
|
243 | 247 | return Definitions.merge( |
@@ -271,16 +275,15 @@ def _type_narrow_defs_assets(defs: Definitions) -> Sequence[MappedAsset]: |
271 | 275 | def _apply_airflow_data_to_specs( |
272 | 276 | assets: Sequence[MappedAsset], |
273 | 277 | serialized_data: SerializedAirflowDefinitionsData, |
274 | | -) -> Iterator[AssetsDefinition]: |
275 | | - """Apply asset spec transformations to the asset definitions.""" |
276 | | - for asset in assets: |
277 | | - narrowed_asset = _type_check_asset(asset) |
278 | | - assets_def = ( |
279 | | - narrowed_asset |
280 | | - if isinstance(narrowed_asset, AssetsDefinition) |
281 | | - else external_asset_from_spec(narrowed_asset) |
282 | | - ) |
283 | | - yield assets_def.map_asset_specs(get_airflow_data_to_spec_mapper(serialized_data)) |
| 278 | +) -> Sequence[MappedAsset]: |
| 279 | + """Apply asset spec transformations to the assets.""" |
| 280 | + return cast( |
| 281 | + "Sequence[MappedAsset]", |
| 282 | + map_asset_specs( |
| 283 | + func=get_airflow_data_to_spec_mapper(serialized_data), |
| 284 | + iterable=assets, |
| 285 | + ), |
| 286 | + ) |
284 | 287 |
|
285 | 288 |
|
286 | 289 | def replace_assets_in_defs( |
@@ -339,31 +342,29 @@ def uri_to_asset_key(uri: str) -> AssetKey: |
339 | 342 | return AssetKey(with_ext_removed) |
340 | 343 |
|
341 | 344 |
|
342 | | -def construct_dataset_defs( |
| 345 | +def construct_dataset_specs( |
343 | 346 | serialized_data: SerializedAirflowDefinitionsData, |
344 | | -) -> Sequence[AssetsDefinition]: |
| 347 | +) -> Sequence[AssetSpec]: |
345 | 348 | """Construct dataset definitions from the serialized Airflow data.""" |
346 | 349 | from dagster_airlift.core.multiple_tasks import assets_with_multiple_task_mappings |
347 | 350 |
|
348 | 351 | return cast( |
349 | | - "Sequence[AssetsDefinition]", |
| 352 | + "Sequence[AssetSpec]", |
350 | 353 | [ |
351 | 354 | assets_with_multiple_task_mappings( |
352 | 355 | task_handles=[ |
353 | 356 | {"dag_id": t.dag_id, "task_id": t.task_id} for t in dataset.producing_tasks |
354 | 357 | ], |
355 | 358 | assets=[ |
356 | | - external_asset_from_spec( |
357 | | - AssetSpec( |
358 | | - key=uri_to_asset_key(dataset.uri), |
359 | | - metadata=dataset.extra, |
360 | | - deps=[ |
361 | | - uri_to_asset_key(upstream_uri) |
362 | | - for upstream_uri in serialized_data.upstream_datasets_by_uri.get( |
363 | | - dataset.uri, set() |
364 | | - ) |
365 | | - ], |
366 | | - ) |
| 359 | + AssetSpec( |
| 360 | + key=uri_to_asset_key(dataset.uri), |
| 361 | + metadata=dataset.extra, |
| 362 | + deps=[ |
| 363 | + uri_to_asset_key(upstream_uri) |
| 364 | + for upstream_uri in serialized_data.upstream_datasets_by_uri.get( |
| 365 | + dataset.uri, set() |
| 366 | + ) |
| 367 | + ], |
367 | 368 | ) |
368 | 369 | ], |
369 | 370 | )[0] |
|
0 commit comments