|
2 | 2 |
|
3 | 3 | from dagster._core.definitions.asset_key import AssetKey
|
4 | 4 | from dagster._core.definitions.asset_spec import AssetSpec, attach_internal_freshness_policy
|
| 5 | +from dagster._core.definitions.assets import AssetsDefinition |
| 6 | +from dagster._core.definitions.decorators.asset_decorator import asset |
5 | 7 | from dagster._core.definitions.definitions_class import Definitions
|
6 | 8 | from dagster._core.definitions.freshness import (
|
7 | 9 | INTERNAL_FRESHNESS_POLICY_METADATA_KEY,
|
|
16 | 18 | )
|
17 | 19 |
|
18 | 20 |
|
| 21 | +def test_asset_decorator_with_internal_freshness_policy() -> None: |
| 22 | + """Can we define an asset from decorator with an internal freshness policy?""" |
| 23 | + |
| 24 | + @asset( |
| 25 | + internal_freshness_policy=TimeWindowFreshnessPolicy.from_timedeltas( |
| 26 | + fail_window=timedelta(minutes=10), warn_window=timedelta(minutes=5) |
| 27 | + ) |
| 28 | + ) |
| 29 | + def asset_with_internal_freshness_policy(): |
| 30 | + pass |
| 31 | + |
| 32 | + spec = asset_with_internal_freshness_policy.get_asset_spec() |
| 33 | + policy = spec.metadata.get(INTERNAL_FRESHNESS_POLICY_METADATA_KEY) |
| 34 | + assert policy is not None |
| 35 | + deserialized = deserialize_value(policy) |
| 36 | + assert isinstance(deserialized, TimeWindowFreshnessPolicy) |
| 37 | + assert deserialized.fail_window == SerializableTimeDelta.from_timedelta(timedelta(minutes=10)) |
| 38 | + assert deserialized.warn_window == SerializableTimeDelta.from_timedelta(timedelta(minutes=5)) |
| 39 | + |
| 40 | + |
19 | 41 | def test_asset_spec_with_internal_freshness_policy() -> None:
|
20 | 42 | """Can we define an asset spec with an internal freshness policy?"""
|
21 | 43 |
|
@@ -98,21 +120,32 @@ def assert_freshness_policy(spec, expected_fail_window, expected_warn_window=Non
|
98 | 120 |
|
99 | 121 |
|
100 | 122 | def test_map_asset_specs_attach_internal_freshness_policy() -> None:
|
101 |
| - """Can we map attach_internal_freshness_policy over a selection of asset specs?""" |
102 |
| - asset_specs = [AssetSpec(key="foo"), AssetSpec(key="bar"), AssetSpec(key="baz")] |
| 123 | + """Can we map attach_internal_freshness_policy over a selection of assets and asset specs?""" |
| 124 | + |
| 125 | + @asset |
| 126 | + def foo_asset(): |
| 127 | + pass |
| 128 | + |
| 129 | + asset_specs = [foo_asset, AssetSpec(key="bar"), AssetSpec(key="baz")] |
| 130 | + defs: Definitions = Definitions(assets=asset_specs) |
| 131 | + |
103 | 132 | freshness_policy = TimeWindowFreshnessPolicy.from_timedeltas(
|
104 | 133 | fail_window=timedelta(minutes=10), warn_window=timedelta(minutes=5)
|
105 | 134 | )
|
106 |
| - defs: Definitions = Definitions(assets=asset_specs) |
107 | 135 | mapped_defs = defs.map_asset_specs(
|
108 | 136 | func=lambda spec: attach_internal_freshness_policy(spec, freshness_policy)
|
109 | 137 | )
|
110 | 138 |
|
111 |
| - assets = mapped_defs.assets |
112 |
| - assert len(assets) == 3 |
113 |
| - for asset in assets: |
114 |
| - assert INTERNAL_FRESHNESS_POLICY_METADATA_KEY in asset.metadata |
115 |
| - policy = deserialize_value(asset.metadata[INTERNAL_FRESHNESS_POLICY_METADATA_KEY]) |
| 139 | + assets_and_specs = mapped_defs.assets |
| 140 | + assert len(assets_and_specs) == 3 |
| 141 | + for asset_or_spec in assets_and_specs: |
| 142 | + spec = ( |
| 143 | + asset_or_spec.get_asset_spec() |
| 144 | + if isinstance(asset_or_spec, AssetsDefinition) |
| 145 | + else asset_or_spec |
| 146 | + ) |
| 147 | + assert INTERNAL_FRESHNESS_POLICY_METADATA_KEY in spec.metadata |
| 148 | + policy = deserialize_value(spec.metadata[INTERNAL_FRESHNESS_POLICY_METADATA_KEY]) |
116 | 149 | assert isinstance(policy, TimeWindowFreshnessPolicy)
|
117 | 150 | assert policy.fail_window == SerializableTimeDelta.from_timedelta(timedelta(minutes=10))
|
118 | 151 | assert policy.warn_window == SerializableTimeDelta.from_timedelta(timedelta(minutes=5))
|
0 commit comments