Skip to content

Commit 1d61e71

Browse files
anuthebananujamiedemaria
authored andcommitted
gql resolver for internal freshness policy (#29319)
## Summary & Motivation Adds a resolvers on `GrapheneAssetNode` for `internalFreshnessPolicy` ## How I Tested These Changes unit tests / bk. ## Changelog > Insert changelog entry or delete this section.
1 parent e3fa5b7 commit 1d61e71

File tree

6 files changed

+159
-1
lines changed

6 files changed

+159
-1
lines changed

js_modules/dagster-ui/packages/ui-core/src/graphql/possibleTypes.generated.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql

+8
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

+34
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py

+11
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@
9292
)
9393
from dagster_graphql.schema.entity_key import GrapheneAssetKey
9494
from dagster_graphql.schema.errors import GrapheneAssetNotFoundError
95+
from dagster_graphql.schema.freshness import GrapheneInternalFreshnessPolicy
9596
from dagster_graphql.schema.freshness_policy import (
9697
GrapheneAssetFreshnessInfo,
9798
GrapheneFreshnessPolicy,
@@ -272,6 +273,7 @@ class GrapheneAssetNode(graphene.ObjectType):
272273
description = graphene.String()
273274
freshnessInfo = graphene.Field(GrapheneAssetFreshnessInfo)
274275
freshnessPolicy = graphene.Field(GrapheneFreshnessPolicy)
276+
internalFreshnessPolicy = graphene.Field(GrapheneInternalFreshnessPolicy)
275277
autoMaterializePolicy = graphene.Field(GrapheneAutoMaterializePolicy)
276278
automationCondition = graphene.Field(GrapheneAutomationCondition)
277279
graphName = graphene.String()
@@ -1173,6 +1175,15 @@ def resolve_freshnessPolicy(
11731175
return GrapheneFreshnessPolicy(self._asset_node_snap.freshness_policy)
11741176
return None
11751177

1178+
def resolve_internalFreshnessPolicy(
1179+
self, _graphene_info: ResolveInfo
1180+
) -> Optional[GrapheneInternalFreshnessPolicy]:
1181+
if self._asset_node_snap.internal_freshness_policy:
1182+
return GrapheneInternalFreshnessPolicy.from_policy(
1183+
self._asset_node_snap.internal_freshness_policy
1184+
)
1185+
return None
1186+
11761187
def resolve_autoMaterializePolicy(
11771188
self, _graphene_info: ResolveInfo
11781189
) -> Optional[GrapheneAutoMaterializePolicy]:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import graphene
2+
from dagster._core.definitions.freshness import InternalFreshnessPolicy, TimeWindowFreshnessPolicy
3+
4+
5+
class GrapheneTimeWindowFreshnessPolicy(graphene.ObjectType):
6+
class Meta:
7+
name = "TimeWindowFreshnessPolicy"
8+
9+
failWindowSeconds = graphene.NonNull(graphene.Int)
10+
warnWindowSeconds = graphene.Int()
11+
12+
13+
class GrapheneInternalFreshnessPolicy(graphene.Union):
14+
class Meta:
15+
name = "InternalFreshnessPolicy"
16+
types = (GrapheneTimeWindowFreshnessPolicy,)
17+
18+
@classmethod
19+
def from_policy(cls, policy: InternalFreshnessPolicy):
20+
if isinstance(policy, TimeWindowFreshnessPolicy):
21+
return GrapheneTimeWindowFreshnessPolicy(
22+
failWindowSeconds=policy.fail_window.to_timedelta().total_seconds(),
23+
warnWindowSeconds=policy.warn_window.to_timedelta().total_seconds()
24+
if policy.warn_window
25+
else None,
26+
)
27+
raise Exception("Unknown freshness policy type")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
from datetime import timedelta
2+
3+
from dagster._core.definitions.decorators.asset_decorator import asset
4+
from dagster._core.definitions.definitions_class import Definitions
5+
from dagster._core.definitions.freshness import InternalFreshnessPolicy
6+
from dagster._core.definitions.repository_definition.repository_definition import (
7+
RepositoryDefinition,
8+
)
9+
from dagster._core.test_utils import instance_for_test
10+
from dagster_graphql.test.utils import define_out_of_process_context, execute_dagster_graphql
11+
12+
GET_INTERNAL_FRESHNESS_POLICY = """
13+
query GetInternalFreshnessPolicy($assetKey: AssetKeyInput!) {
14+
assetNodes(assetKeys: [$assetKey]) {
15+
internalFreshnessPolicy {
16+
... on TimeWindowFreshnessPolicy {
17+
failWindowSeconds
18+
warnWindowSeconds
19+
}
20+
}
21+
}
22+
}
23+
"""
24+
25+
26+
@asset(
27+
internal_freshness_policy=InternalFreshnessPolicy.time_window(
28+
fail_window=timedelta(minutes=10), warn_window=timedelta(minutes=5)
29+
)
30+
)
31+
def asset_with_internal_freshness_policy_with_warn_window():
32+
pass
33+
34+
35+
@asset(
36+
internal_freshness_policy=InternalFreshnessPolicy.time_window(fail_window=timedelta(minutes=10))
37+
)
38+
def asset_with_internal_freshness_policy():
39+
pass
40+
41+
42+
def get_repo() -> RepositoryDefinition:
43+
return Definitions(
44+
assets=[
45+
asset_with_internal_freshness_policy,
46+
asset_with_internal_freshness_policy_with_warn_window,
47+
]
48+
).get_repository_def()
49+
50+
51+
def test_internal_freshness_policy_time_window():
52+
with instance_for_test() as instance:
53+
with define_out_of_process_context(__file__, "get_repo", instance) as graphql_context:
54+
result = execute_dagster_graphql(
55+
graphql_context,
56+
GET_INTERNAL_FRESHNESS_POLICY,
57+
variables={"assetKey": asset_with_internal_freshness_policy.key.to_graphql_input()},
58+
)
59+
assert result.data["assetNodes"][0]["internalFreshnessPolicy"] == {
60+
"failWindowSeconds": 600,
61+
"warnWindowSeconds": None,
62+
}
63+
64+
65+
def test_internal_freshness_policy_time_window_with_warn_window():
66+
with instance_for_test() as instance:
67+
with define_out_of_process_context(__file__, "get_repo", instance) as graphql_context:
68+
result = execute_dagster_graphql(
69+
graphql_context,
70+
GET_INTERNAL_FRESHNESS_POLICY,
71+
variables={
72+
"assetKey": asset_with_internal_freshness_policy_with_warn_window.key.to_graphql_input()
73+
},
74+
)
75+
assert result.data["assetNodes"][0]["internalFreshnessPolicy"] == {
76+
"failWindowSeconds": 600,
77+
"warnWindowSeconds": 300,
78+
}

0 commit comments

Comments
 (0)