Skip to content

Commit b5f4e13

Browse files
committed
add asset graph
1 parent 7b24cc9 commit b5f4e13

File tree

3 files changed

+133
-1
lines changed

3 files changed

+133
-1
lines changed

python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py

+47-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,12 @@
1717
AutomationConditionSnapshot,
1818
)
1919
from dagster._core.definitions.freshness import FreshnessState
20-
from dagster._core.definitions.partition import CachingDynamicPartitionsLoader, PartitionsDefinition
20+
from dagster._core.definitions.partition import (
21+
CachingDynamicPartitionsLoader,
22+
PartitionLoadingContext,
23+
PartitionsDefinition,
24+
TemporalContext,
25+
)
2126
from dagster._core.definitions.partition_mapping import PartitionMapping
2227
from dagster._core.definitions.remote_asset_graph import RemoteAssetNode, RemoteWorkspaceAssetNode
2328
from dagster._core.definitions.selector import JobSelector
@@ -44,6 +49,7 @@
4449
from dagster._core.storage.tags import KIND_PREFIX
4550
from dagster._core.utils import is_valid_email
4651
from dagster._core.workspace.permissions import Permissions
52+
from dagster._time import get_current_datetime
4753
from packaging import version
4854

4955
from dagster_graphql.implementation.events import iterate_metadata_entries
@@ -103,6 +109,7 @@
103109
GrapheneObservationEvent,
104110
)
105111
from dagster_graphql.schema.metadata import GrapheneMetadataEntry
112+
from dagster_graphql.schema.partition_keys import GraphenePartitionKeyConnection
106113
from dagster_graphql.schema.partition_mappings import GraphenePartitionMapping
107114
from dagster_graphql.schema.partition_sets import (
108115
GrapheneDimensionPartitionKeys,
@@ -300,6 +307,12 @@ class GrapheneAssetNode(graphene.ObjectType):
300307
opVersion = graphene.String()
301308
partitionDefinition = graphene.Field(GraphenePartitionDefinition)
302309
partitionKeys = non_null_list(graphene.String)
310+
partitionKeyConnection = graphene.Field(
311+
GraphenePartitionKeyConnection,
312+
limit=graphene.Argument(graphene.NonNull(graphene.Int)),
313+
ascending=graphene.Argument(graphene.NonNull(graphene.Boolean)),
314+
cursor=graphene.Argument(graphene.String),
315+
)
303316
partitionKeysByDimension = graphene.Field(
304317
non_null_list(GrapheneDimensionPartitionKeys),
305318
startIdx=graphene.Int(),
@@ -1542,6 +1555,39 @@ def resolve_partitionKeysByDimension(
15421555
def resolve_partitionKeys(self, _graphene_info: ResolveInfo) -> Sequence[str]:
15431556
return self.get_partition_keys()
15441557

1558+
def resolve_partitionKeyConnection(
1559+
self,
1560+
graphene_info: ResolveInfo,
1561+
limit: int,
1562+
ascending: bool,
1563+
cursor: Optional[str] = None,
1564+
) -> Optional[GraphenePartitionKeyConnection]:
1565+
if not self._dynamic_partitions_loader:
1566+
check.failed("dynamic_partitions_loader must be provided to get partition keys")
1567+
1568+
if not self._remote_node.is_partitioned:
1569+
return None
1570+
1571+
partitions_def = self._get_partitions_def()
1572+
context = PartitionLoadingContext(
1573+
TemporalContext(
1574+
effective_dt=get_current_datetime(),
1575+
last_event_id=graphene_info.context.instance.event_log_storage.get_maximum_record_id(),
1576+
),
1577+
dynamic_partitions_store=self._dynamic_partitions_loader,
1578+
)
1579+
conn = partitions_def.get_partition_key_connection(
1580+
context=context,
1581+
limit=limit,
1582+
ascending=ascending,
1583+
cursor=cursor,
1584+
)
1585+
return GraphenePartitionKeyConnection(
1586+
results=conn.results,
1587+
cursor=conn.cursor,
1588+
hasMore=conn.has_more,
1589+
)
1590+
15451591
def resolve_partitionDefinition(
15461592
self, _graphene_info: ResolveInfo
15471593
) -> Optional[GraphenePartitionDefinition]:

python_modules/dagster-graphql/dagster_graphql/schema/partition_keys.py

+9
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,15 @@
44
from dagster_graphql.schema.util import non_null_list
55

66

7+
class GraphenePartitionKeyConnection(graphene.ObjectType):
8+
results = non_null_list(graphene.String)
9+
cursor = graphene.NonNull(graphene.String)
10+
hasMore = graphene.NonNull(graphene.Boolean)
11+
12+
class Meta:
13+
name = "PartitionKeyConnection"
14+
15+
716
class GraphenePartitionKeys(graphene.ObjectType):
817
partitionKeys = non_null_list(graphene.String)
918

python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py

+77
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,19 @@
337337
}
338338
"""
339339

340+
GET_ASSET_PARTITIONS_CONNECTION = """
341+
query AssetNodeQuery($assetKeys: [AssetKeyInput!], $limit: Int!, $ascending: Boolean!, $cursor: String) {
342+
assetNodes(assetKeys: $assetKeys) {
343+
id
344+
partitionKeyConnection(limit: $limit, ascending: $ascending, cursor: $cursor) {
345+
results
346+
cursor
347+
hasMore
348+
}
349+
}
350+
}
351+
"""
352+
340353
GET_PARTITIONS_BY_DIMENSION = """
341354
query AssetNodeQuery($assetKeys: [AssetKeyInput!], $startIdx: Int, $endIdx: Int) {
342355
assetNodes(assetKeys: $assetKeys) {
@@ -1418,6 +1431,70 @@ def test_asset_partitions_in_pipeline(self, graphql_context: WorkspaceRequestCon
14181431
assert asset_node["partitionKeys"][0] == "2021-05-05-01:00"
14191432
assert asset_node["partitionKeys"][1] == "2021-05-05-02:00"
14201433

1434+
def test_asset_partition_key_connection(self, graphql_context: WorkspaceRequestContext):
1435+
result = execute_dagster_graphql(
1436+
graphql_context,
1437+
GET_ASSET_PARTITIONS_CONNECTION,
1438+
variables={
1439+
"assetKeys": [
1440+
{"path": ["not_included_asset"]},
1441+
{"path": ["upstream_static_partitioned_asset"]},
1442+
],
1443+
"limit": 2,
1444+
"ascending": True,
1445+
},
1446+
)
1447+
1448+
assert result.data and result.data["assetNodes"]
1449+
assert len(result.data["assetNodes"]) == 2
1450+
unpartitioned_asset_node = result.data["assetNodes"][0]
1451+
assert unpartitioned_asset_node["partitionKeyConnection"] is None
1452+
partitioned_asset_node = result.data["assetNodes"][1]
1453+
partition_key_conn = partitioned_asset_node["partitionKeyConnection"]
1454+
assert partition_key_conn["hasMore"]
1455+
assert len(partition_key_conn["results"]) == 2
1456+
assert partition_key_conn["results"] == ["a", "b"]
1457+
cursor = partition_key_conn["cursor"]
1458+
1459+
result = execute_dagster_graphql(
1460+
graphql_context,
1461+
GET_ASSET_PARTITIONS_CONNECTION,
1462+
variables={
1463+
"assetKeys": [
1464+
{"path": ["upstream_static_partitioned_asset"]},
1465+
],
1466+
"limit": 10,
1467+
"ascending": True,
1468+
"cursor": cursor,
1469+
},
1470+
)
1471+
assert result.data and result.data["assetNodes"]
1472+
assert len(result.data["assetNodes"]) == 1
1473+
partitioned_asset_node = result.data["assetNodes"][0]
1474+
partition_key_conn = partitioned_asset_node["partitionKeyConnection"]
1475+
assert not partition_key_conn["hasMore"]
1476+
assert len(partition_key_conn["results"]) == 4
1477+
assert partition_key_conn["results"] == ["c", "d", "e", "f"]
1478+
1479+
result = execute_dagster_graphql(
1480+
graphql_context,
1481+
GET_ASSET_PARTITIONS_CONNECTION,
1482+
variables={
1483+
"assetKeys": [
1484+
{"path": ["upstream_static_partitioned_asset"]},
1485+
],
1486+
"limit": 10,
1487+
"ascending": False,
1488+
},
1489+
)
1490+
assert result.data and result.data["assetNodes"]
1491+
assert len(result.data["assetNodes"]) == 1
1492+
partitioned_asset_node = result.data["assetNodes"][0]
1493+
partition_key_conn = partitioned_asset_node["partitionKeyConnection"]
1494+
assert not partition_key_conn["hasMore"]
1495+
assert len(partition_key_conn["results"]) == 6
1496+
assert partition_key_conn["results"] == ["f", "e", "d", "c", "b", "a"]
1497+
14211498
def test_latest_materialization_per_partition(self, graphql_context: WorkspaceRequestContext):
14221499
_create_partitioned_run(graphql_context, "partition_materialization_job", partition_key="c")
14231500
_create_partitioned_run(graphql_context, "partition_materialization_job", partition_key="d")

0 commit comments

Comments
 (0)