Skip to content

Commit 3642d11

Browse files
authored
[asset health] include freshness in asset health (#29268)
## Summary & Motivation Includes the freshness status of each asset in asset health. Follows these rules: - `HEALTHY` - the freshness policy is in a PASS-ing state - `WARNING` - the freshness policy is in a WARN-ing state - `DEGRADED` - the freshness policy is in a FAIL-ing state - `UNKNOWN` - the freshness policy has never been evaluated or is in an UNKNOWN state - `NOT_APPLICABLE` - the asset does not have a freshness policy defined. I need to access the freshness getter function from OSS, so i added an instance method for it. This could also be solved by moving the storage methods to a shared storage class (Run or EventLog probably). I don't have a preference and picked adding the Instance method for no specific reason. Similarly, I needed to move `FreshnessRecord` and `FreshnessRecordBody` to OSS so that I could use them as type hints ## How I Tested These Changes tests in dagster-io/internal#15022
1 parent 79abeb1 commit 3642d11

File tree

8 files changed

+165
-15
lines changed

8 files changed

+165
-15
lines changed

Diff for: js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

+5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

+27
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py

+43-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from dagster._core.definitions.declarative_automation.serialized_objects import (
1717
AutomationConditionSnapshot,
1818
)
19+
from dagster._core.definitions.freshness import FreshnessState
1920
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition
2021
from dagster._core.definitions.partition_mapping import PartitionMapping
2122
from dagster._core.definitions.remote_asset_graph import RemoteAssetNode, RemoteWorkspaceAssetNode
@@ -71,6 +72,7 @@
7172
GrapheneAssetHealthCheckMeta,
7273
GrapheneAssetHealthCheckUnknownMeta,
7374
GrapheneAssetHealthCheckWarningMeta,
75+
GrapheneAssetHealthFreshnessMeta,
7476
GrapheneAssetHealthMaterializationDegradedNotPartitionedMeta,
7577
GrapheneAssetHealthMaterializationDegradedPartitionedMeta,
7678
GrapheneAssetHealthMaterializationMeta,
@@ -803,10 +805,43 @@ async def get_asset_check_status_for_asset_health(
803805
# all checks must have executed and passed
804806
return GrapheneAssetHealthStatus.HEALTHY, None
805807

806-
def get_freshness_status_for_asset_health(self, graphene_info: ResolveInfo) -> tuple[str, None]:
807-
# if SLA is met, healthy
808-
# if SLA violated with warning, warning
809-
# if SLA violated with error, degraded
808+
async def get_freshness_status_for_asset_health(
809+
self, graphene_info: ResolveInfo
810+
) -> tuple[str, Optional[GrapheneAssetHealthFreshnessMeta]]:
811+
"""Computes the health indicator for the freshness for an asset. Follows these rules:
812+
HEALTHY - the freshness policy is in a PASS-ing state
813+
WARNING - the freshness policy is in a WARN-ing state
814+
DEGRADED - the freshness policy is in a FAIL-ing state
815+
UNKNOWN - the freshness policy has never been evaluated or is in an UNKNOWN state
816+
NOT_APPLICABLE - the asset does not have a freshness policy defined.
817+
"""
818+
if self._asset_node_snap.internal_freshness_policy is None:
819+
return GrapheneAssetHealthStatus.NOT_APPLICABLE, None
820+
821+
freshness_state_record = graphene_info.context.instance.get_entity_freshness_state(
822+
self._asset_node_snap.asset_key
823+
)
824+
if freshness_state_record is None:
825+
return GrapheneAssetHealthStatus.UNKNOWN, None
826+
state = freshness_state_record.freshness_state
827+
if state == FreshnessState.PASS:
828+
return GrapheneAssetHealthStatus.HEALTHY, None
829+
830+
asset_record = await AssetRecord.gen(graphene_info.context, self._asset_node_snap.asset_key)
831+
last_materialization = (
832+
asset_record.asset_entry.last_materialization.timestamp
833+
if asset_record and asset_record.asset_entry.last_materialization
834+
else None
835+
)
836+
if state == FreshnessState.WARN:
837+
return GrapheneAssetHealthStatus.WARNING, GrapheneAssetHealthFreshnessMeta(
838+
lastMaterializedTimestamp=last_materialization,
839+
)
840+
if state == FreshnessState.FAIL:
841+
return GrapheneAssetHealthStatus.DEGRADED, GrapheneAssetHealthFreshnessMeta(
842+
lastMaterializedTimestamp=last_materialization,
843+
)
844+
810845
return GrapheneAssetHealthStatus.UNKNOWN, None
811846

812847
async def resolve_assetHealth(
@@ -819,13 +854,16 @@ async def resolve_assetHealth(
819854
materialization_status,
820855
materialization_meta,
821856
) = await self.get_materialization_status_for_asset_health(graphene_info)
822-
freshness_status, freshness_meta = self.get_freshness_status_for_asset_health(graphene_info)
857+
freshness_status, freshness_meta = await self.get_freshness_status_for_asset_health(
858+
graphene_info
859+
)
823860
return GrapheneAssetHealth(
824861
assetChecksStatus=check_status,
825862
assetChecksStatusMetadata=check_meta,
826863
materializationStatus=materialization_status,
827864
materializationStatusMetadata=materialization_meta,
828865
freshnessStatus=freshness_status,
866+
freshnessStatusMetadata=freshness_meta,
829867
)
830868

831869
def resolve_hasMaterializePermission(

Diff for: python_modules/dagster-graphql/dagster_graphql/schema/asset_health.py

+8
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,21 @@ class Meta:
8383
name = "AssetHealthMaterializationMeta"
8484

8585

86+
class GrapheneAssetHealthFreshnessMeta(graphene.ObjectType):
87+
lastMaterializedTimestamp = graphene.Field(graphene.Float)
88+
89+
class Meta:
90+
name = "AssetHealthFreshnessMeta"
91+
92+
8693
class GrapheneAssetHealth(graphene.ObjectType):
8794
assetHealth = graphene.NonNull(GrapheneAssetHealthStatus)
8895
materializationStatus = graphene.NonNull(GrapheneAssetHealthStatus)
8996
materializationStatusMetadata = graphene.Field(GrapheneAssetHealthMaterializationMeta)
9097
assetChecksStatus = graphene.NonNull(GrapheneAssetHealthStatus)
9198
assetChecksStatusMetadata = graphene.Field(GrapheneAssetHealthCheckMeta)
9299
freshnessStatus = graphene.NonNull(GrapheneAssetHealthStatus)
100+
freshnessStatusMetadata = graphene.Field(GrapheneAssetHealthFreshnessMeta)
93101

94102
class Meta:
95103
name = "AssetHealth"

Diff for: python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_all_snapshot_ids.ambr

+36-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py

+8-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from collections.abc import Iterator, Mapping, Sequence
1010
from contextlib import contextmanager
1111
from copy import deepcopy
12+
from datetime import timedelta
1213
from typing import Optional, TypeVar, Union
1314

1415
from dagster import (
@@ -96,6 +97,7 @@
9697
from dagster._core.definitions.events import Failure
9798
from dagster._core.definitions.executor_definition import in_process_executor
9899
from dagster._core.definitions.external_asset import external_asset_from_spec
100+
from dagster._core.definitions.freshness import InternalFreshnessPolicy
99101
from dagster._core.definitions.freshness_policy import FreshnessPolicy
100102
from dagster._core.definitions.job_definition import JobDefinition
101103
from dagster._core.definitions.metadata import MetadataValue
@@ -1677,7 +1679,12 @@ def the_op():
16771679
the_op()
16781680

16791681

1680-
@asset(owners=["[email protected]", "team:team1"])
1682+
@asset(
1683+
owners=["[email protected]", "team:team1"],
1684+
internal_freshness_policy=InternalFreshnessPolicy.time_window(
1685+
fail_window=timedelta(minutes=10), warn_window=timedelta(minutes=5)
1686+
),
1687+
)
16811688
def asset_1():
16821689
yield Output(3)
16831690

Diff for: python_modules/dagster/dagster/_core/definitions/freshness.py

+33-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import datetime
21
from abc import ABC
32
from collections.abc import Mapping
3+
from datetime import datetime, timedelta
44
from enum import Enum
55
from typing import Any, Optional
66

@@ -42,7 +42,7 @@ def from_asset_spec_metadata(
4242

4343
@staticmethod
4444
def time_window(
45-
fail_window: datetime.timedelta, warn_window: Optional[datetime.timedelta] = None
45+
fail_window: timedelta, warn_window: Optional[timedelta] = None
4646
) -> "TimeWindowFreshnessPolicy":
4747
return TimeWindowFreshnessPolicy.from_timedeltas(fail_window, warn_window)
4848

@@ -54,13 +54,41 @@ class TimeWindowFreshnessPolicy(InternalFreshnessPolicy, IHaveNew):
5454
warn_window: Optional[SerializableTimeDelta] = None
5555

5656
@classmethod
57-
def from_timedeltas(
58-
cls, fail_window: datetime.timedelta, warn_window: Optional[datetime.timedelta] = None
59-
):
57+
def from_timedeltas(cls, fail_window: timedelta, warn_window: Optional[timedelta] = None):
6058
if warn_window:
6159
check.invariant(warn_window < fail_window, "warn_window must be less than fail_window")
6260

6361
return cls(
6462
fail_window=SerializableTimeDelta.from_timedelta(fail_window),
6563
warn_window=SerializableTimeDelta.from_timedelta(warn_window) if warn_window else None,
6664
)
65+
66+
67+
@whitelist_for_serdes
68+
@record
69+
class FreshnessStateRecordBody:
70+
"""Store serialized metadata about the freshness state for an entity.
71+
72+
Left blank for now, a few examples of what we might want to store here:
73+
- Source timestamp for external assets / freshness checks
74+
- Snapshot of the freshness policy at the time of record creation
75+
"""
76+
77+
metadata: Optional[dict[str, Any]]
78+
79+
80+
@record
81+
class FreshnessStateRecord:
82+
entity_key: AssetKey
83+
freshness_state: FreshnessState
84+
updated_at: datetime
85+
record_body: FreshnessStateRecordBody
86+
87+
@staticmethod
88+
def from_db_row(db_row):
89+
return FreshnessStateRecord(
90+
entity_key=check.not_none(AssetKey.from_db_string(db_row[0])),
91+
freshness_state=FreshnessState(db_row[3]),
92+
record_body=deserialize_value(db_row[4], FreshnessStateRecordBody),
93+
updated_at=db_row[5],
94+
)

Diff for: python_modules/dagster/dagster/_core/instance/__init__.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
)
3838
from dagster._core.definitions.data_version import extract_data_provenance_from_entry
3939
from dagster._core.definitions.events import AssetKey, AssetObservation
40-
from dagster._core.definitions.freshness import FreshnessStateEvaluation
40+
from dagster._core.definitions.freshness import FreshnessStateEvaluation, FreshnessStateRecord
4141
from dagster._core.definitions.partition_key_range import PartitionKeyRange
4242
from dagster._core.errors import (
4343
DagsterHomeNotSetError,
@@ -3395,6 +3395,10 @@ def report_runless_asset_event(
33953395
),
33963396
)
33973397

3398+
def get_entity_freshness_state(self, entity_key: AssetKey) -> Optional[FreshnessStateRecord]:
3399+
warnings.warn("`get_entity_freshness_state` is not yet implemented for OSS.")
3400+
return None
3401+
33983402
def get_asset_check_support(self) -> "AssetCheckInstanceSupport":
33993403
from dagster._core.storage.asset_check_execution_record import AssetCheckInstanceSupport
34003404

0 commit comments

Comments
 (0)