Skip to content

Commit 9409665

Browse files
authored
gql resolver for internal freshness policy (#29319)
## Summary & Motivation Adds a resolvers on `GrapheneAssetNode` for `internalFreshnessPolicy` ## How I Tested These Changes unit tests / bk. ## Changelog > Insert changelog entry or delete this section.
1 parent 721d8d2 commit 9409665

File tree

6 files changed

+159
-1
lines changed

6 files changed

+159
-1
lines changed

js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

+34
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py

+11
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
)
9999
from dagster_graphql.schema.entity_key import GrapheneAssetKey
100100
from dagster_graphql.schema.errors import GrapheneAssetNotFoundError
101+
from dagster_graphql.schema.freshness import GrapheneInternalFreshnessPolicy
101102
from dagster_graphql.schema.freshness_policy import (
102103
GrapheneAssetFreshnessInfo,
103104
GrapheneFreshnessPolicy,
@@ -279,6 +280,7 @@ class GrapheneAssetNode(graphene.ObjectType):
279280
description = graphene.String()
280281
freshnessInfo = graphene.Field(GrapheneAssetFreshnessInfo)
281282
freshnessPolicy = graphene.Field(GrapheneFreshnessPolicy)
283+
internalFreshnessPolicy = graphene.Field(GrapheneInternalFreshnessPolicy)
282284
autoMaterializePolicy = graphene.Field(GrapheneAutoMaterializePolicy)
283285
automationCondition = graphene.Field(GrapheneAutomationCondition)
284286
graphName = graphene.String()
@@ -1179,6 +1181,15 @@ def resolve_freshnessPolicy(
11791181
return GrapheneFreshnessPolicy(self._asset_node_snap.freshness_policy)
11801182
return None
11811183

1184+
def resolve_internalFreshnessPolicy(
1185+
self, _graphene_info: ResolveInfo
1186+
) -> Optional[GrapheneInternalFreshnessPolicy]:
1187+
if self._asset_node_snap.internal_freshness_policy:
1188+
return GrapheneInternalFreshnessPolicy.from_policy(
1189+
self._asset_node_snap.internal_freshness_policy
1190+
)
1191+
return None
1192+
11821193
def resolve_autoMaterializePolicy(
11831194
self, _graphene_info: ResolveInfo
11841195
) -> Optional[GrapheneAutoMaterializePolicy]:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import graphene
2+
from dagster._core.definitions.freshness import InternalFreshnessPolicy, TimeWindowFreshnessPolicy
3+
4+
5+
class GrapheneTimeWindowFreshnessPolicy(graphene.ObjectType):
6+
class Meta:
7+
name = "TimeWindowFreshnessPolicy"
8+
9+
failWindowSeconds = graphene.NonNull(graphene.Int)
10+
warnWindowSeconds = graphene.Int()
11+
12+
13+
class GrapheneInternalFreshnessPolicy(graphene.Union):
14+
class Meta:
15+
name = "InternalFreshnessPolicy"
16+
types = (GrapheneTimeWindowFreshnessPolicy,)
17+
18+
@classmethod
19+
def from_policy(cls, policy: InternalFreshnessPolicy):
20+
if isinstance(policy, TimeWindowFreshnessPolicy):
21+
return GrapheneTimeWindowFreshnessPolicy(
22+
failWindowSeconds=policy.fail_window.to_timedelta().total_seconds(),
23+
warnWindowSeconds=policy.warn_window.to_timedelta().total_seconds()
24+
if policy.warn_window
25+
else None,
26+
)
27+
raise Exception("Unknown freshness policy type")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from datetime import timedelta
2+
3+
from dagster._core.definitions.decorators.asset_decorator import asset
4+
from dagster._core.definitions.definitions_class import Definitions
5+
from dagster._core.definitions.freshness import InternalFreshnessPolicy
6+
from dagster._core.definitions.repository_definition.repository_definition import (
7+
RepositoryDefinition,
8+
)
9+
from dagster._core.test_utils import instance_for_test
10+
from dagster_graphql.test.utils import define_out_of_process_context, execute_dagster_graphql
11+
12+
GET_INTERNAL_FRESHNESS_POLICY = """
13+
query GetInternalFreshnessPolicy($assetKey: AssetKeyInput!) {
14+
assetNodes(assetKeys: [$assetKey]) {
15+
internalFreshnessPolicy {
16+
... on TimeWindowFreshnessPolicy {
17+
failWindowSeconds
18+
warnWindowSeconds
19+
}
20+
}
21+
}
22+
}
23+
"""
24+
25+
26+
@asset(
27+
internal_freshness_policy=InternalFreshnessPolicy.time_window(
28+
fail_window=timedelta(minutes=10), warn_window=timedelta(minutes=5)
29+
)
30+
)
31+
def asset_with_internal_freshness_policy_with_warn_window():
32+
pass
33+
34+
35+
@asset(
36+
internal_freshness_policy=InternalFreshnessPolicy.time_window(fail_window=timedelta(minutes=10))
37+
)
38+
def asset_with_internal_freshness_policy():
39+
pass
40+
41+
42+
def get_repo() -> RepositoryDefinition:
43+
return Definitions(
44+
assets=[
45+
asset_with_internal_freshness_policy,
46+
asset_with_internal_freshness_policy_with_warn_window,
47+
]
48+
).get_repository_def()
49+
50+
51+
def test_internal_freshness_policy_time_window():
52+
with instance_for_test() as instance:
53+
with define_out_of_process_context(__file__, "get_repo", instance) as graphql_context:
54+
result = execute_dagster_graphql(
55+
graphql_context,
56+
GET_INTERNAL_FRESHNESS_POLICY,
57+
variables={"assetKey": asset_with_internal_freshness_policy.key.to_graphql_input()},
58+
)
59+
assert result.data["assetNodes"][0]["internalFreshnessPolicy"] == {
60+
"failWindowSeconds": 600,
61+
"warnWindowSeconds": None,
62+
}
63+
64+
65+
def test_internal_freshness_policy_time_window_with_warn_window():
66+
with instance_for_test() as instance:
67+
with define_out_of_process_context(__file__, "get_repo", instance) as graphql_context:
68+
result = execute_dagster_graphql(
69+
graphql_context,
70+
GET_INTERNAL_FRESHNESS_POLICY,
71+
variables={
72+
"assetKey": asset_with_internal_freshness_policy_with_warn_window.key.to_graphql_input()
73+
},
74+
)
75+
assert result.data["assetNodes"][0]["internalFreshnessPolicy"] == {
76+
"failWindowSeconds": 600,
77+
"warnWindowSeconds": 300,
78+
}

0 commit comments

Comments
 (0)