Skip to content

Commit 339b824

Browse files
authored
A new asset freshness API (#28828)
## Summary & Motivation Introduces a new API for defining freshness as a top-level attribute on an asset. This API will replace freshness checks and freshness policies as the way to model asset freshness in Dagster. For more details on the motivation behind needing a new API, please refer to the [internal product review](https://www.notion.so/dagster/New-Asset-Freshness-System-Product-Review-1c318b92e46280fe859ce86f5d78934a?d=1c718b92e462807ca2e1001c0a331dc0#1c718b92e46280c8973ef9358ec38367). The implementation here is slightly different than the desired end state, which is to reclaim the deprecated `FreshnessPolicy` construct. Instead we're writing the new freshness policy to the asset spec metadata. This will unblock internal testing and give us time to implement a longer-term migration plan for `FreshnessPolicy`. ### Auxiliary Changes - Relocate `SerializableTimeDelta` from `dagster._core.definitions.declarative_automation.util` to `dagster_shared.serdes.utils` ## How I Tested These Changes - new + existing unit tests for backward compatibility ## Changelog > Insert changelog entry or delete this section.
1 parent e8ec890 commit 339b824

File tree

13 files changed

+318
-33
lines changed

13 files changed

+318
-33
lines changed

Diff for: js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: python_modules/dagster/dagster/_core/definitions/asset_spec.py

+29-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
overload,
1212
)
1313

14-
from dagster_shared.serdes import whitelist_for_serdes
14+
from dagster_shared.serdes import serialize_value, whitelist_for_serdes
1515

1616
import dagster._check as check
1717
from dagster._annotations import (
@@ -26,6 +26,10 @@
2626
AutomationCondition,
2727
)
2828
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
29+
from dagster._core.definitions.freshness import (
30+
INTERNAL_FRESHNESS_POLICY_METADATA_KEY,
31+
InternalFreshnessPolicy,
32+
)
2933
from dagster._core.definitions.freshness_policy import FreshnessPolicy
3034
from dagster._core.definitions.partition import PartitionsDefinition
3135
from dagster._core.definitions.partition_mapping import PartitionMapping
@@ -108,6 +112,11 @@ def validate_kind_tags(kinds: Optional[AbstractSet[str]]) -> None:
108112
breaking_version="1.10.0",
109113
additional_warn_text="use `automation_condition` instead",
110114
)
115+
@hidden_param(
116+
param="internal_freshness_policy",
117+
breaking_version="1.10.0",
118+
additional_warn_text="experimental feature, use freshness checks instead",
119+
)
111120
@record_custom
112121
class AssetSpec(IHasInternalInit, IHaveNew, LegacyNamedTupleMixin):
113122
"""Specifies the core attributes of an asset, except for the function that materializes or
@@ -195,6 +204,15 @@ def __new__(
195204
}
196205
validate_kind_tags(kind_tags)
197206

207+
internal_freshness_policy: Optional[InternalFreshnessPolicy] = kwargs.get(
208+
"internal_freshness_policy"
209+
)
210+
if internal_freshness_policy:
211+
metadata = {
212+
**(metadata or {}),
213+
INTERNAL_FRESHNESS_POLICY_METADATA_KEY: serialize_value(internal_freshness_policy),
214+
}
215+
198216
return super().__new__(
199217
cls,
200218
key=key,
@@ -435,3 +453,13 @@ def map_asset_specs(
435453
obj.map_asset_specs(func) if isinstance(obj, AssetsDefinition) else func(obj)
436454
for obj in iterable
437455
]
456+
457+
458+
def attach_internal_freshness_policy(spec: AssetSpec, policy: InternalFreshnessPolicy) -> AssetSpec:
459+
"""Apply a freshness policy to an asset spec, attaching it to the spec's metadata.
460+
461+
You can use this in Definitions.map_asset_specs to attach a freshness policy to an asset spec.
462+
"""
463+
return spec.merge_attributes(
464+
metadata={INTERNAL_FRESHNESS_POLICY_METADATA_KEY: serialize_value(policy)} # pyright: ignore[reportArgumentType]
465+
)

Diff for: python_modules/dagster/dagster/_core/definitions/declarative_automation/operands/operands.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import TYPE_CHECKING, Optional
44

55
from dagster_shared.serdes import whitelist_for_serdes
6+
from dagster_shared.serdes.utils import SerializableTimeDelta
67

78
from dagster._core.asset_graph_view.entity_subset import EntitySubset
89
from dagster._core.definitions.asset_key import AssetCheckKey, AssetKey
@@ -14,7 +15,6 @@
1415
from dagster._core.definitions.declarative_automation.operands.subset_automation_condition import (
1516
SubsetAutomationCondition,
1617
)
17-
from dagster._core.definitions.declarative_automation.utils import SerializableTimeDelta
1818
from dagster._record import record
1919
from dagster._utils.schedules import reverse_cron_string_iterator
2020

Original file line numberDiff line numberDiff line change
@@ -1,27 +1 @@
1-
import datetime
2-
from typing import NamedTuple
31

4-
from dagster_shared.serdes import whitelist_for_serdes
5-
6-
7-
@whitelist_for_serdes
8-
class SerializableTimeDelta(NamedTuple):
9-
"""A Dagster-serializable version of a datetime.timedelta. The datetime.timedelta class
10-
internally stores values as an integer number of days, seconds, and microseconds. This class
11-
handles converting between the in-memory and serializable formats.
12-
"""
13-
14-
days: int
15-
seconds: int
16-
microseconds: int
17-
18-
@staticmethod
19-
def from_timedelta(timedelta: datetime.timedelta) -> "SerializableTimeDelta":
20-
return SerializableTimeDelta(
21-
days=timedelta.days, seconds=timedelta.seconds, microseconds=timedelta.microseconds
22-
)
23-
24-
def to_timedelta(self) -> datetime.timedelta:
25-
return datetime.timedelta(
26-
days=self.days, seconds=self.seconds, microseconds=self.microseconds
27-
)

Diff for: python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py

+15
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
CoercibleToAssetKey,
3535
CoercibleToAssetKeyPrefix,
3636
)
37+
from dagster._core.definitions.freshness import INTERNAL_FRESHNESS_POLICY_METADATA_KEY
3738
from dagster._core.definitions.freshness_policy import FreshnessPolicy
3839
from dagster._core.definitions.input import GraphIn
3940
from dagster._core.definitions.metadata import ArbitraryMetadataMapping, RawMetadataMapping
@@ -50,6 +51,7 @@
5051
from dagster._core.errors import DagsterInvalidDefinitionError
5152
from dagster._core.storage.tags import KIND_PREFIX
5253
from dagster._core.types.dagster_type import DagsterType
54+
from dagster._serdes import serialize_value
5355
from dagster._utils.tags import normalize_tags
5456
from dagster._utils.warnings import disable_dagster_warnings
5557

@@ -138,6 +140,12 @@ def _validate_hidden_non_argument_dep_param(
138140
emit_runtime_warning=False,
139141
breaking_version="1.10.0",
140142
)
143+
@hidden_param(
144+
param="internal_freshness_policy",
145+
emit_runtime_warning=False,
146+
breaking_version="1.10.0",
147+
additional_warn_text="experimental, use freshness checks instead.",
148+
)
141149
def asset(
142150
compute_fn: Optional[Callable[..., Any]] = None,
143151
*,
@@ -293,6 +301,13 @@ def downstream_asset(conditional_asset):
293301
**{f"{KIND_PREFIX}{kind}": "" for kind in kinds or []},
294302
}
295303

304+
internal_freshness_policy = kwargs.get("internal_freshness_policy")
305+
if internal_freshness_policy:
306+
metadata = {
307+
**(metadata or {}),
308+
INTERNAL_FRESHNESS_POLICY_METADATA_KEY: serialize_value(internal_freshness_policy),
309+
}
310+
296311
only_allow_hidden_params_in_kwargs(asset, kwargs)
297312

298313
args = AssetDecoratorArgs(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import datetime
2+
from abc import ABC
3+
from collections.abc import Mapping
4+
from enum import Enum
5+
from typing import Any, Optional
6+
7+
from dagster_shared.serdes.utils import SerializableTimeDelta
8+
9+
from dagster._core.definitions.asset_key import AssetKey
10+
from dagster._record import IHaveNew, record
11+
from dagster._serdes import deserialize_value, whitelist_for_serdes
12+
from dagster._utils import check
13+
14+
15+
@whitelist_for_serdes
16+
class FreshnessState(str, Enum):
17+
PASS = "PASS"
18+
WARN = "WARN"
19+
FAIL = "FAIL"
20+
UNKNOWN = "UNKNOWN"
21+
22+
23+
@whitelist_for_serdes
24+
@record
25+
class FreshnessStateEvaluation:
26+
key: AssetKey
27+
freshness_state: FreshnessState
28+
29+
30+
INTERNAL_FRESHNESS_POLICY_METADATA_KEY = "dagster/internal_freshness_policy"
31+
32+
33+
class InternalFreshnessPolicy(ABC):
34+
@classmethod
35+
def from_asset_spec_metadata(
36+
cls, metadata: Mapping[str, Any]
37+
) -> Optional["InternalFreshnessPolicy"]:
38+
serialized_policy = metadata.get(INTERNAL_FRESHNESS_POLICY_METADATA_KEY)
39+
if serialized_policy is None:
40+
return None
41+
return deserialize_value(serialized_policy.value, cls) # pyright: ignore
42+
43+
@staticmethod
44+
def time_window(
45+
fail_window: datetime.timedelta, warn_window: Optional[datetime.timedelta] = None
46+
) -> "TimeWindowFreshnessPolicy":
47+
return TimeWindowFreshnessPolicy.from_timedeltas(fail_window, warn_window)
48+
49+
50+
@whitelist_for_serdes
51+
@record
52+
class TimeWindowFreshnessPolicy(InternalFreshnessPolicy, IHaveNew):
53+
fail_window: SerializableTimeDelta
54+
warn_window: Optional[SerializableTimeDelta] = None
55+
56+
@classmethod
57+
def from_timedeltas(
58+
cls, fail_window: datetime.timedelta, warn_window: Optional[datetime.timedelta] = None
59+
):
60+
if warn_window:
61+
check.invariant(warn_window < fail_window, "warn_window must be less than fail_window")
62+
63+
return cls(
64+
fail_window=SerializableTimeDelta.from_timedelta(fail_window),
65+
warn_window=SerializableTimeDelta.from_timedelta(warn_window) if warn_window else None,
66+
)

Diff for: python_modules/dagster/dagster/_core/events/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767

6868
if TYPE_CHECKING:
6969
from dagster._core.definitions.events import ObjectStoreOperation
70+
from dagster._core.definitions.freshness import FreshnessStateEvaluation
7071
from dagster._core.execution.plan.plan import ExecutionPlan
7172
from dagster._core.execution.plan.step import StepKind
7273

@@ -93,6 +94,7 @@
9394
"AssetCheckEvaluationPlanned",
9495
"AssetFailedToMaterializeData",
9596
"RunEnqueuedData",
97+
"FreshnessStateEvaluation",
9698
]
9799

98100

@@ -169,6 +171,8 @@ class DagsterEventType(str, Enum):
169171

170172
LOGS_CAPTURED = "LOGS_CAPTURED"
171173

174+
FRESHNESS_STATE_EVALUATION = "FRESHNESS_STATE_EVALUATION"
175+
172176

173177
EVENT_TYPE_TO_DISPLAY_STRING = {
174178
DagsterEventType.PIPELINE_ENQUEUED: "RUN_ENQUEUED",

Diff for: python_modules/dagster/dagster/_core/instance/__init__.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
)
3838
from dagster._core.definitions.data_version import extract_data_provenance_from_entry
3939
from dagster._core.definitions.events import AssetKey, AssetObservation
40+
from dagster._core.definitions.freshness import FreshnessStateEvaluation
4041
from dagster._core.definitions.partition_key_range import PartitionKeyRange
4142
from dagster._core.errors import (
4243
DagsterHomeNotSetError,
@@ -3351,7 +3352,12 @@ def get_latest_materialization_code_versions(
33513352
@public
33523353
def report_runless_asset_event(
33533354
self,
3354-
asset_event: Union["AssetMaterialization", "AssetObservation", "AssetCheckEvaluation"],
3355+
asset_event: Union[
3356+
"AssetMaterialization",
3357+
"AssetObservation",
3358+
"AssetCheckEvaluation",
3359+
"FreshnessStateEvaluation",
3360+
],
33553361
):
33563362
"""Record an event log entry related to assets that does not belong to a Dagster run."""
33573363
from dagster._core.events import (
@@ -3371,10 +3377,13 @@ def report_runless_asset_event(
33713377
elif isinstance(asset_event, AssetObservation):
33723378
event_type_value = DagsterEventType.ASSET_OBSERVATION.value
33733379
data_payload = AssetObservationData(asset_event)
3380+
elif isinstance(asset_event, FreshnessStateEvaluation):
3381+
event_type_value = DagsterEventType.FRESHNESS_STATE_EVALUATION.value
3382+
data_payload = asset_event
33743383
else:
33753384
raise DagsterInvariantViolationError(
33763385
f"Received unexpected asset event type {asset_event}, expected"
3377-
" AssetMaterialization, AssetObservation or AssetCheckEvaluation"
3386+
" AssetMaterialization, AssetObservation, AssetCheckEvaluation or FreshnessStateEvaluation"
33783387
)
33793388

33803389
return self.report_dagster_event(

Diff for: python_modules/dagster/dagster/_core/remote_representation/external_data.py

+5
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
OpNode,
6262
)
6363
from dagster._core.definitions.events import AssetKey
64+
from dagster._core.definitions.freshness import InternalFreshnessPolicy
6465
from dagster._core.definitions.freshness_policy import FreshnessPolicy
6566
from dagster._core.definitions.metadata import (
6667
MetadataFieldSerializer,
@@ -1553,6 +1554,10 @@ def automation_condition(self) -> Optional[AutomationCondition]:
15531554
else:
15541555
return None
15551556

1557+
@property
1558+
def internal_freshness_policy(self) -> Optional[InternalFreshnessPolicy]:
1559+
return InternalFreshnessPolicy.from_asset_spec_metadata(self.metadata)
1560+
15561561

15571562
ResourceJobUsageMap: TypeAlias = dict[str, list[ResourceJobUsageEntry]]
15581563

0 commit comments

Comments
 (0)