Skip to content

Commit 3e9bd60

Browse files
committed
allow setting internal freshness policy in asset decorator
1 parent 066d781 commit 3e9bd60

File tree

2 files changed

+50
-8
lines changed

2 files changed

+50
-8
lines changed

python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py

+9
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
CoercibleToAssetKey,
3535
CoercibleToAssetKeyPrefix,
3636
)
37+
from dagster._core.definitions.freshness import INTERNAL_FRESHNESS_POLICY_METADATA_KEY
3738
from dagster._core.definitions.freshness_policy import FreshnessPolicy
3839
from dagster._core.definitions.input import GraphIn
3940
from dagster._core.definitions.metadata import ArbitraryMetadataMapping, RawMetadataMapping
@@ -50,6 +51,7 @@
5051
from dagster._core.errors import DagsterInvalidDefinitionError
5152
from dagster._core.storage.tags import KIND_PREFIX
5253
from dagster._core.types.dagster_type import DagsterType
54+
from dagster._serdes import serialize_value
5355
from dagster._utils.tags import normalize_tags
5456
from dagster._utils.warnings import disable_dagster_warnings
5557

@@ -299,6 +301,13 @@ def downstream_asset(conditional_asset):
299301
**{f"{KIND_PREFIX}{kind}": "" for kind in kinds or []},
300302
}
301303

304+
internal_freshness_policy = kwargs.get("internal_freshness_policy")
305+
if internal_freshness_policy:
306+
metadata = {
307+
**(metadata or {}),
308+
INTERNAL_FRESHNESS_POLICY_METADATA_KEY: serialize_value(internal_freshness_policy),
309+
}
310+
302311
only_allow_hidden_params_in_kwargs(asset, kwargs)
303312

304313
args = AssetDecoratorArgs(

python_modules/dagster/dagster_tests/asset_defs_tests/test_internal_freshness.py

+41-8
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from dagster._core.definitions.asset_key import AssetKey
44
from dagster._core.definitions.asset_spec import AssetSpec, attach_internal_freshness_policy
5+
from dagster._core.definitions.assets import AssetsDefinition
6+
from dagster._core.definitions.decorators.asset_decorator import asset
57
from dagster._core.definitions.definitions_class import Definitions
68
from dagster._core.definitions.freshness import (
79
INTERNAL_FRESHNESS_POLICY_METADATA_KEY,
@@ -16,6 +18,26 @@
1618
)
1719

1820

21+
def test_asset_decorator_with_internal_freshness_policy() -> None:
22+
"""Can we define an asset from decorator with an internal freshness policy?"""
23+
24+
@asset(
25+
internal_freshness_policy=TimeWindowFreshnessPolicy.from_timedeltas(
26+
fail_window=timedelta(minutes=10), warn_window=timedelta(minutes=5)
27+
)
28+
)
29+
def asset_with_internal_freshness_policy():
30+
pass
31+
32+
spec = asset_with_internal_freshness_policy.get_asset_spec()
33+
policy = spec.metadata.get(INTERNAL_FRESHNESS_POLICY_METADATA_KEY)
34+
assert policy is not None
35+
deserialized = deserialize_value(policy)
36+
assert isinstance(deserialized, TimeWindowFreshnessPolicy)
37+
assert deserialized.fail_window == SerializableTimeDelta.from_timedelta(timedelta(minutes=10))
38+
assert deserialized.warn_window == SerializableTimeDelta.from_timedelta(timedelta(minutes=5))
39+
40+
1941
def test_asset_spec_with_internal_freshness_policy() -> None:
2042
"""Can we define an asset spec with an internal freshness policy?"""
2143

@@ -98,21 +120,32 @@ def assert_freshness_policy(spec, expected_fail_window, expected_warn_window=Non
98120

99121

100122
def test_map_asset_specs_attach_internal_freshness_policy() -> None:
101-
"""Can we map attach_internal_freshness_policy over a selection of asset specs?"""
102-
asset_specs = [AssetSpec(key="foo"), AssetSpec(key="bar"), AssetSpec(key="baz")]
123+
"""Can we map attach_internal_freshness_policy over a selection of assets and asset specs?"""
124+
125+
@asset
126+
def foo_asset():
127+
pass
128+
129+
asset_specs = [foo_asset, AssetSpec(key="bar"), AssetSpec(key="baz")]
130+
defs: Definitions = Definitions(assets=asset_specs)
131+
103132
freshness_policy = TimeWindowFreshnessPolicy.from_timedeltas(
104133
fail_window=timedelta(minutes=10), warn_window=timedelta(minutes=5)
105134
)
106-
defs: Definitions = Definitions(assets=asset_specs)
107135
mapped_defs = defs.map_asset_specs(
108136
func=lambda spec: attach_internal_freshness_policy(spec, freshness_policy)
109137
)
110138

111-
assets = mapped_defs.assets
112-
assert len(assets) == 3
113-
for asset in assets:
114-
assert INTERNAL_FRESHNESS_POLICY_METADATA_KEY in asset.metadata
115-
policy = deserialize_value(asset.metadata[INTERNAL_FRESHNESS_POLICY_METADATA_KEY])
139+
assets_and_specs = mapped_defs.assets
140+
assert len(assets_and_specs) == 3
141+
for asset_or_spec in assets_and_specs:
142+
spec = (
143+
asset_or_spec.get_asset_spec()
144+
if isinstance(asset_or_spec, AssetsDefinition)
145+
else asset_or_spec
146+
)
147+
assert INTERNAL_FRESHNESS_POLICY_METADATA_KEY in spec.metadata
148+
policy = deserialize_value(spec.metadata[INTERNAL_FRESHNESS_POLICY_METADATA_KEY])
116149
assert isinstance(policy, TimeWindowFreshnessPolicy)
117150
assert policy.fail_window == SerializableTimeDelta.from_timedelta(timedelta(minutes=10))
118151
assert policy.warn_window == SerializableTimeDelta.from_timedelta(timedelta(minutes=5))

0 commit comments

Comments
 (0)