Skip to content

[asset health] include freshness in asset health #29268

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 15, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from dagster._core.definitions.declarative_automation.serialized_objects import (
AutomationConditionSnapshot,
)
from dagster._core.definitions.freshness import FreshnessState
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.remote_asset_graph import RemoteAssetNode, RemoteWorkspaceAssetNode
Expand Down Expand Up @@ -71,6 +72,7 @@
GrapheneAssetHealthCheckMeta,
GrapheneAssetHealthCheckUnknownMeta,
GrapheneAssetHealthCheckWarningMeta,
GrapheneAssetHealthFreshnessMeta,
GrapheneAssetHealthMaterializationDegradedNotPartitionedMeta,
GrapheneAssetHealthMaterializationDegradedPartitionedMeta,
GrapheneAssetHealthMaterializationMeta,
Expand Down Expand Up @@ -803,10 +805,43 @@ async def get_asset_check_status_for_asset_health(
# all checks must have executed and passed
return GrapheneAssetHealthStatus.HEALTHY, None

def get_freshness_status_for_asset_health(self, graphene_info: ResolveInfo) -> tuple[str, None]:
# if SLA is met, healthy
# if SLA violated with warning, warning
# if SLA violated with error, degraded
async def get_freshness_status_for_asset_health(
self, graphene_info: ResolveInfo
) -> tuple[str, Optional[GrapheneAssetHealthFreshnessMeta]]:
"""Computes the health indicator for the freshness for an asset. Follows these rules:
HEALTHY - the freshness policy is in a PASS-ing state
WARNING - the freshness policy is in a WARN-ing state
DEGRADED - the freshness policy is in a FAIL-ing state
UNKNOWN - the freshness policy has never been evaluated or is in an UNKNOWN state
NOT_APPLICABLE - the asset does not have a freshness policy defined.
"""
if self._asset_node_snap.internal_freshness_policy is None:
return GrapheneAssetHealthStatus.NOT_APPLICABLE, None

freshness_state_record = graphene_info.context.instance.get_entity_freshness_state(
self._asset_node_snap.asset_key
)
if freshness_state_record is None:
return GrapheneAssetHealthStatus.UNKNOWN, None
state = freshness_state_record.freshness_state
if state == FreshnessState.PASS:
return GrapheneAssetHealthStatus.HEALTHY, None

asset_record = await AssetRecord.gen(graphene_info.context, self._asset_node_snap.asset_key)
last_materialization = (
asset_record.asset_entry.last_materialization.timestamp
if asset_record and asset_record.asset_entry.last_materialization
else None
)
if state == FreshnessState.WARN:
return GrapheneAssetHealthStatus.WARNING, GrapheneAssetHealthFreshnessMeta(
lastMaterializedTimestamp=last_materialization,
)
if state == FreshnessState.FAIL:
return GrapheneAssetHealthStatus.DEGRADED, GrapheneAssetHealthFreshnessMeta(
lastMaterializedTimestamp=last_materialization,
)

return GrapheneAssetHealthStatus.UNKNOWN, None

async def resolve_assetHealth(
Expand All @@ -819,13 +854,16 @@ async def resolve_assetHealth(
materialization_status,
materialization_meta,
) = await self.get_materialization_status_for_asset_health(graphene_info)
freshness_status, freshness_meta = self.get_freshness_status_for_asset_health(graphene_info)
freshness_status, freshness_meta = await self.get_freshness_status_for_asset_health(
graphene_info
)
return GrapheneAssetHealth(
assetChecksStatus=check_status,
assetChecksStatusMetadata=check_meta,
materializationStatus=materialization_status,
materializationStatusMetadata=materialization_meta,
freshnessStatus=freshness_status,
freshnessStatusMetadata=freshness_meta,
)

def resolve_hasMaterializePermission(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,21 @@ class Meta:
name = "AssetHealthMaterializationMeta"


class GrapheneAssetHealthFreshnessMeta(graphene.ObjectType):
lastMaterializedTimestamp = graphene.Field(graphene.Float)

class Meta:
name = "AssetHealthFreshnessMeta"


class GrapheneAssetHealth(graphene.ObjectType):
assetHealth = graphene.NonNull(GrapheneAssetHealthStatus)
materializationStatus = graphene.NonNull(GrapheneAssetHealthStatus)
materializationStatusMetadata = graphene.Field(GrapheneAssetHealthMaterializationMeta)
assetChecksStatus = graphene.NonNull(GrapheneAssetHealthStatus)
assetChecksStatusMetadata = graphene.Field(GrapheneAssetHealthCheckMeta)
freshnessStatus = graphene.NonNull(GrapheneAssetHealthStatus)
freshnessStatusMetadata = graphene.Field(GrapheneAssetHealthFreshnessMeta)

class Meta:
name = "AssetHealth"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections.abc import Iterator, Mapping, Sequence
from contextlib import contextmanager
from copy import deepcopy
from datetime import timedelta
from typing import Optional, TypeVar, Union

from dagster import (
Expand Down Expand Up @@ -96,6 +97,7 @@
from dagster._core.definitions.events import Failure
from dagster._core.definitions.executor_definition import in_process_executor
from dagster._core.definitions.external_asset import external_asset_from_spec
from dagster._core.definitions.freshness import InternalFreshnessPolicy
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.metadata import MetadataValue
Expand Down Expand Up @@ -1677,7 +1679,12 @@ def the_op():
the_op()


@asset(owners=["[email protected]", "team:team1"])
@asset(
owners=["[email protected]", "team:team1"],
internal_freshness_policy=InternalFreshnessPolicy.time_window(
fail_window=timedelta(minutes=10), warn_window=timedelta(minutes=5)
),
)
def asset_1():
yield Output(3)

Expand Down
38 changes: 33 additions & 5 deletions python_modules/dagster/dagster/_core/definitions/freshness.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
from abc import ABC
from collections.abc import Mapping
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Optional

Expand Down Expand Up @@ -42,7 +42,7 @@ def from_asset_spec_metadata(

@staticmethod
def time_window(
fail_window: datetime.timedelta, warn_window: Optional[datetime.timedelta] = None
fail_window: timedelta, warn_window: Optional[timedelta] = None
) -> "TimeWindowFreshnessPolicy":
return TimeWindowFreshnessPolicy.from_timedeltas(fail_window, warn_window)

Expand All @@ -54,13 +54,41 @@ class TimeWindowFreshnessPolicy(InternalFreshnessPolicy, IHaveNew):
warn_window: Optional[SerializableTimeDelta] = None

@classmethod
def from_timedeltas(
cls, fail_window: datetime.timedelta, warn_window: Optional[datetime.timedelta] = None
):
def from_timedeltas(cls, fail_window: timedelta, warn_window: Optional[timedelta] = None):
if warn_window:
check.invariant(warn_window < fail_window, "warn_window must be less than fail_window")

return cls(
fail_window=SerializableTimeDelta.from_timedelta(fail_window),
warn_window=SerializableTimeDelta.from_timedelta(warn_window) if warn_window else None,
)


@whitelist_for_serdes
@record
class FreshnessStateRecordBody:
"""Store serialized metadata about the freshness state for an entity.

Left blank for now, a few examples of what we might want to store here:
- Source timestamp for external assets / freshness checks
- Snapshot of the freshness policy at the time of record creation
"""

metadata: Optional[dict[str, Any]]


@record
class FreshnessStateRecord:
entity_key: AssetKey
freshness_state: FreshnessState
updated_at: datetime
record_body: FreshnessStateRecordBody

@staticmethod
def from_db_row(db_row):
return FreshnessStateRecord(
entity_key=check.not_none(AssetKey.from_db_string(db_row[0])),
freshness_state=FreshnessState(db_row[3]),
record_body=deserialize_value(db_row[4], FreshnessStateRecordBody),
updated_at=db_row[5],
)
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
)
from dagster._core.definitions.data_version import extract_data_provenance_from_entry
from dagster._core.definitions.events import AssetKey, AssetObservation
from dagster._core.definitions.freshness import FreshnessStateEvaluation
from dagster._core.definitions.freshness import FreshnessStateEvaluation, FreshnessStateRecord
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.errors import (
DagsterHomeNotSetError,
Expand Down Expand Up @@ -3395,6 +3395,10 @@ def report_runless_asset_event(
),
)

def get_entity_freshness_state(self, entity_key: AssetKey) -> Optional[FreshnessStateRecord]:
warnings.warn("`get_entity_freshness_state` is not yet implemented for OSS.")
return None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need any warning or error message to signify that it's not implemented in OSS / is cloud only?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the method isn't marked public i think it's probably fine? in other similar cases we will usually return None, [], etc and not have a warning, but there's probably no harm in having a warning either


def get_asset_check_support(self) -> "AssetCheckInstanceSupport":
from dagster._core.storage.asset_check_execution_record import AssetCheckInstanceSupport

Expand Down