Skip to content

gql resolver for internal freshness policy #29319

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
)
from dagster_graphql.schema.entity_key import GrapheneAssetKey
from dagster_graphql.schema.errors import GrapheneAssetNotFoundError
from dagster_graphql.schema.freshness import GrapheneInternalFreshnessPolicy
from dagster_graphql.schema.freshness_policy import (
GrapheneAssetFreshnessInfo,
GrapheneFreshnessPolicy,
Expand Down Expand Up @@ -279,6 +280,7 @@ class GrapheneAssetNode(graphene.ObjectType):
description = graphene.String()
freshnessInfo = graphene.Field(GrapheneAssetFreshnessInfo)
freshnessPolicy = graphene.Field(GrapheneFreshnessPolicy)
internalFreshnessPolicy = graphene.Field(GrapheneInternalFreshnessPolicy)
autoMaterializePolicy = graphene.Field(GrapheneAutoMaterializePolicy)
automationCondition = graphene.Field(GrapheneAutomationCondition)
graphName = graphene.String()
Expand Down Expand Up @@ -1179,6 +1181,15 @@ def resolve_freshnessPolicy(
return GrapheneFreshnessPolicy(self._asset_node_snap.freshness_policy)
return None

def resolve_internalFreshnessPolicy(
self, _graphene_info: ResolveInfo
) -> Optional[GrapheneInternalFreshnessPolicy]:
if self._asset_node_snap.internal_freshness_policy:
return GrapheneInternalFreshnessPolicy.from_policy(
self._asset_node_snap.internal_freshness_policy
)
return None

def resolve_autoMaterializePolicy(
self, _graphene_info: ResolveInfo
) -> Optional[GrapheneAutoMaterializePolicy]:
Expand Down
27 changes: 27 additions & 0 deletions python_modules/dagster-graphql/dagster_graphql/schema/freshness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import graphene
from dagster._core.definitions.freshness import InternalFreshnessPolicy, TimeWindowFreshnessPolicy


class GrapheneTimeWindowFreshnessPolicy(graphene.ObjectType):
class Meta:
name = "TimeWindowFreshnessPolicy"

failWindowSeconds = graphene.NonNull(graphene.Int)
warnWindowSeconds = graphene.Int()


class GrapheneInternalFreshnessPolicy(graphene.Union):
class Meta:
name = "InternalFreshnessPolicy"
types = (GrapheneTimeWindowFreshnessPolicy,)

@classmethod
def from_policy(cls, policy: InternalFreshnessPolicy):
if isinstance(policy, TimeWindowFreshnessPolicy):
return GrapheneTimeWindowFreshnessPolicy(
failWindowSeconds=policy.fail_window.to_timedelta().total_seconds(),
warnWindowSeconds=policy.warn_window.to_timedelta().total_seconds()
if policy.warn_window
else None,
)
raise Exception("Unknown freshness policy type")
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
from datetime import timedelta

from dagster._core.definitions.decorators.asset_decorator import asset
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.freshness import InternalFreshnessPolicy
from dagster._core.definitions.repository_definition.repository_definition import (
RepositoryDefinition,
)
from dagster._core.test_utils import instance_for_test
from dagster_graphql.test.utils import define_out_of_process_context, execute_dagster_graphql

GET_INTERNAL_FRESHNESS_POLICY = """
query GetInternalFreshnessPolicy($assetKey: AssetKeyInput!) {
assetNodes(assetKeys: [$assetKey]) {
internalFreshnessPolicy {
... on TimeWindowFreshnessPolicy {
failWindowSeconds
warnWindowSeconds
}
}
}
}
"""


@asset(
internal_freshness_policy=InternalFreshnessPolicy.time_window(
fail_window=timedelta(minutes=10), warn_window=timedelta(minutes=5)
)
)
def asset_with_internal_freshness_policy_with_warn_window():
pass


@asset(
internal_freshness_policy=InternalFreshnessPolicy.time_window(fail_window=timedelta(minutes=10))
)
def asset_with_internal_freshness_policy():
pass


def get_repo() -> RepositoryDefinition:
return Definitions(
assets=[
asset_with_internal_freshness_policy,
asset_with_internal_freshness_policy_with_warn_window,
]
).get_repository_def()


def test_internal_freshness_policy_time_window():
with instance_for_test() as instance:
with define_out_of_process_context(__file__, "get_repo", instance) as graphql_context:
result = execute_dagster_graphql(
graphql_context,
GET_INTERNAL_FRESHNESS_POLICY,
variables={"assetKey": asset_with_internal_freshness_policy.key.to_graphql_input()},
)
assert result.data["assetNodes"][0]["internalFreshnessPolicy"] == {
"failWindowSeconds": 600,
"warnWindowSeconds": None,
}


def test_internal_freshness_policy_time_window_with_warn_window():
with instance_for_test() as instance:
with define_out_of_process_context(__file__, "get_repo", instance) as graphql_context:
result = execute_dagster_graphql(
graphql_context,
GET_INTERNAL_FRESHNESS_POLICY,
variables={
"assetKey": asset_with_internal_freshness_policy_with_warn_window.key.to_graphql_input()
},
)
assert result.data["assetNodes"][0]["internalFreshnessPolicy"] == {
"failWindowSeconds": 600,
"warnWindowSeconds": 300,
}