Skip to content

Commit e79234a

Browse files
committed
tests
1 parent a3aff0c commit e79234a

File tree

2 files changed

+223
-10
lines changed

2 files changed

+223
-10
lines changed

Diff for: python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py

+30
Original file line numberDiff line numberDiff line change
@@ -1752,6 +1752,31 @@ def ungrouped_asset_5():
17521752
return 1
17531753

17541754

1755+
@asset(key_prefix="grouping_prefix")
1756+
def asset_with_prefix_1():
1757+
return 1
1758+
1759+
1760+
@asset(key_prefix="grouping_prefix")
1761+
def asset_with_prefix_2():
1762+
return 1
1763+
1764+
1765+
@asset(key_prefix="grouping_prefix")
1766+
def asset_with_prefix_3():
1767+
return 1
1768+
1769+
1770+
@asset(key_prefix="grouping_prefix")
1771+
def asset_with_prefix_4():
1772+
return 1
1773+
1774+
1775+
@asset(key_prefix="grouping_prefix")
1776+
def asset_with_prefix_5():
1777+
return 1
1778+
1779+
17551780
@multi_asset(outs={"int_asset": AssetOut(), "str_asset": AssetOut()})
17561781
def typed_multi_asset() -> tuple[int, str]:
17571782
return (1, "yay")
@@ -2204,6 +2229,11 @@ def define_assets():
22042229
concurrency_asset,
22052230
concurrency_graph_asset,
22062231
concurrency_multi_asset,
2232+
asset_with_prefix_1,
2233+
asset_with_prefix_2,
2234+
asset_with_prefix_3,
2235+
asset_with_prefix_4,
2236+
asset_with_prefix_5,
22072237
]
22082238

22092239

Diff for: python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py

+193-10
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,14 @@
2424
AssetMaterializationFailure,
2525
AssetMaterializationFailureReason,
2626
AssetMaterializationFailureType,
27+
AssetObservation,
2728
)
2829
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionKey
29-
from dagster._core.events import StepMaterializationData
30+
from dagster._core.events import (
31+
AssetMaterializationPlannedData,
32+
AssetObservationData,
33+
StepMaterializationData,
34+
)
3035
from dagster._core.events.log import EventLogEntry
3136
from dagster._core.storage.dagster_run import DagsterRunStatus
3237
from dagster._core.test_utils import instance_for_test, poll_for_finished_run
@@ -896,6 +901,24 @@
896901
}
897902
"""
898903

904+
GET_ASSET_SORT_KEY = """
905+
query AssetSortKeyQuery($prefix: [String!]!) {
906+
assetsOrError(prefix: $prefix) {
907+
... on AssetConnection {
908+
nodes {
909+
key {
910+
path
911+
}
912+
latestEventSortKey
913+
}
914+
}
915+
... on PythonError {
916+
message
917+
}
918+
}
919+
}
920+
"""
921+
899922

900923
def _create_run(
901924
graphql_context: WorkspaceRequestContext,
@@ -1255,7 +1278,10 @@ def test_asset_asof_timestamp(self, graphql_context: WorkspaceRequestContext):
12551278
result = execute_dagster_graphql(
12561279
graphql_context,
12571280
GET_ASSET_MATERIALIZATION_AFTER_TIMESTAMP,
1258-
variables={"assetKey": {"path": ["a"]}, "afterTimestamp": str(after_timestamp)},
1281+
variables={
1282+
"assetKey": {"path": ["a"]},
1283+
"afterTimestamp": str(after_timestamp),
1284+
},
12591285
)
12601286
assert result.data
12611287
assert result.data["assetOrError"]
@@ -1268,7 +1294,10 @@ def test_asset_asof_timestamp(self, graphql_context: WorkspaceRequestContext):
12681294
result = execute_dagster_graphql(
12691295
graphql_context,
12701296
GET_ASSET_MATERIALIZATION_AFTER_TIMESTAMP,
1271-
variables={"assetKey": {"path": ["a"]}, "afterTimestamp": str(after_timestamp)},
1297+
variables={
1298+
"assetKey": {"path": ["a"]},
1299+
"afterTimestamp": str(after_timestamp),
1300+
},
12721301
)
12731302
assert result.data
12741303
assert result.data["assetOrError"]
@@ -2758,7 +2787,11 @@ def test_automation_condition(self, graphql_context: WorkspaceRequestContext):
27582787
assert len(custom_automation_condition_asset) == 1
27592788
condition = custom_automation_condition_asset[0]["automationCondition"]
27602789
assert condition["label"] is None
2761-
assert condition["expandedLabel"] == ["(some_custom_name)", "SINCE", "(handled)"]
2790+
assert condition["expandedLabel"] == [
2791+
"(some_custom_name)",
2792+
"SINCE",
2793+
"(handled)",
2794+
]
27622795

27632796
def test_tags(self, graphql_context: WorkspaceRequestContext):
27642797
result = execute_dagster_graphql(
@@ -2824,7 +2857,12 @@ def test_has_asset_checks(self, graphql_context: WorkspaceRequestContext):
28242857
assert result.data["assetNodes"]
28252858

28262859
for a in result.data["assetNodes"]:
2827-
if a["assetKey"]["path"] in [["asset_1"], ["one"], ["check_in_op_asset"], ["asset_3"]]:
2860+
if a["assetKey"]["path"] in [
2861+
["asset_1"],
2862+
["one"],
2863+
["check_in_op_asset"],
2864+
["asset_3"],
2865+
]:
28282866
assert a["hasAssetChecks"] is True
28292867
else:
28302868
assert a["hasAssetChecks"] is False, f"Asset {a['assetKey']} has asset checks"
@@ -2930,6 +2968,135 @@ def test_get_partition_mapping(self, graphql_context: WorkspaceRequestContext):
29302968
== "Maps a downstream partition to any upstream partition with an overlapping time window."
29312969
)
29322970

2971+
def test_asset_event_sort_key(self, graphql_context: WorkspaceRequestContext):
2972+
"""Tests that the asset event sort key is correct, and based on the latest event for an asset
2973+
when querying for asset events.
2974+
"""
2975+
# TestAssetAwareEventLog::test_asset_event_sort_key[sqlite_with_default_run_launcher_managed_grpc_env]
2976+
# log events for some assets, make sure the time order of the events doesn't match the
2977+
# alpahbetical order of the asset keys so that the default alphabetical sorting doesnt
2978+
asset_prefix = "grouping_prefix"
2979+
query_keys = [
2980+
AssetKey([asset_prefix, "asset_with_prefix_1"]),
2981+
AssetKey([asset_prefix, "asset_with_prefix_2"]),
2982+
AssetKey([asset_prefix, "asset_with_prefix_3"]),
2983+
AssetKey([asset_prefix, "asset_with_prefix_4"]),
2984+
AssetKey([asset_prefix, "asset_with_prefix_5"]),
2985+
]
2986+
storage = graphql_context.instance.event_log_storage
2987+
test_run_id = make_new_run_id()
2988+
storage.store_event(
2989+
EventLogEntry(
2990+
error_info=None,
2991+
user_message="",
2992+
level="debug",
2993+
run_id=test_run_id,
2994+
timestamp=1.0,
2995+
dagster_event=DagsterEvent(
2996+
DagsterEventType.ASSET_MATERIALIZATION.value,
2997+
"nonce",
2998+
event_specific_data=StepMaterializationData(
2999+
materialization=AssetMaterialization(asset_key=query_keys[3]),
3000+
),
3001+
),
3002+
)
3003+
)
3004+
storage.store_event(
3005+
EventLogEntry(
3006+
error_info=None,
3007+
user_message="",
3008+
level="debug",
3009+
run_id=test_run_id,
3010+
timestamp=2.0,
3011+
dagster_event=DagsterEvent(
3012+
DagsterEventType.ASSET_MATERIALIZATION.value,
3013+
"nonce",
3014+
event_specific_data=StepMaterializationData(
3015+
materialization=AssetMaterialization(asset_key=query_keys[0]),
3016+
),
3017+
),
3018+
)
3019+
)
3020+
storage.store_event(
3021+
EventLogEntry(
3022+
error_info=None,
3023+
user_message="",
3024+
level="debug",
3025+
run_id=test_run_id,
3026+
timestamp=3.0,
3027+
dagster_event=DagsterEvent(
3028+
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
3029+
"nonce",
3030+
event_specific_data=AssetMaterializationPlannedData(asset_key=query_keys[1]),
3031+
),
3032+
)
3033+
)
3034+
storage.store_event(
3035+
EventLogEntry(
3036+
error_info=None,
3037+
user_message="",
3038+
level="debug",
3039+
run_id=test_run_id,
3040+
timestamp=4.0,
3041+
dagster_event=DagsterEvent(
3042+
DagsterEventType.ASSET_OBSERVATION.value,
3043+
"nonce",
3044+
event_specific_data=AssetObservationData(
3045+
AssetObservation(asset_key=query_keys[2])
3046+
),
3047+
),
3048+
)
3049+
)
3050+
storage.store_event(
3051+
EventLogEntry(
3052+
error_info=None,
3053+
user_message="",
3054+
level="debug",
3055+
run_id=test_run_id,
3056+
timestamp=5.0,
3057+
dagster_event=DagsterEvent.build_asset_failed_to_materialize_event(
3058+
DagsterEventType.ASSET_FAILED_TO_MATERIALIZE.value,
3059+
"nonce",
3060+
asset_materialization_failure=AssetMaterializationFailure(
3061+
asset_key=query_keys[4],
3062+
failure_type=AssetMaterializationFailureType.MATERIALIZATION,
3063+
reason=AssetMaterializationFailureReason.FAILED_TO_MATERIALIZE,
3064+
),
3065+
),
3066+
)
3067+
)
3068+
if storage.asset_records_have_last_planned_and_failed_materializations:
3069+
expected_order = {
3070+
query_keys[0]: 2,
3071+
query_keys[1]: 3,
3072+
query_keys[2]: 4,
3073+
query_keys[3]: 1,
3074+
query_keys[4]: 5,
3075+
}
3076+
else:
3077+
expected_order = {
3078+
query_keys[0]: 2,
3079+
query_keys[1]: None,
3080+
query_keys[2]: None,
3081+
query_keys[3]: 1,
3082+
query_keys[4]: None,
3083+
}
3084+
3085+
result = execute_dagster_graphql(
3086+
graphql_context,
3087+
GET_ASSET_SORT_KEY,
3088+
variables={
3089+
"prefix": [asset_prefix],
3090+
},
3091+
)
3092+
assert result
3093+
assert len(result.data["assetsOrError"]["nodes"]) == 4
3094+
3095+
for list_item in result.data["assetsOrError"]["nodes"]:
3096+
asset_key = AssetKey.from_graphql_input(list_item["key"])
3097+
assert asset_key in expected_order.keys()
3098+
assert list_item["latestEventSortKey"] == expected_order[asset_key]
3099+
29333100

29343101
# This is factored out of TestAssetAwareEventLog because there is a separate implementation for plus
29353102
# graphql tests.
@@ -3619,14 +3786,26 @@ def test_2d_subset_backcompat():
36193786

36203787
assert ranges[0]["primaryDimStartKey"] == "2022-03-03"
36213788
assert ranges[0]["primaryDimEndKey"] == "2022-03-04"
3622-
assert set(ranges[0]["secondaryDim"]["materializedPartitions"]) == {"a", "c"}
3623-
assert set(ranges[0]["secondaryDim"]["unmaterializedPartitions"]) == {"b", "d"}
3789+
assert set(ranges[0]["secondaryDim"]["materializedPartitions"]) == {
3790+
"a",
3791+
"c",
3792+
}
3793+
assert set(ranges[0]["secondaryDim"]["unmaterializedPartitions"]) == {
3794+
"b",
3795+
"d",
3796+
}
36243797

36253798
assert ranges[1]["primaryDimStartKey"] == "2022-03-06"
36263799
assert ranges[1]["primaryDimEndKey"] == "2022-03-06"
36273800
assert len(ranges[1]["secondaryDim"]["materializedPartitions"]) == 2
3628-
assert set(ranges[1]["secondaryDim"]["materializedPartitions"]) == {"a", "c"}
3629-
assert set(ranges[1]["secondaryDim"]["unmaterializedPartitions"]) == {"b", "d"}
3801+
assert set(ranges[1]["secondaryDim"]["materializedPartitions"]) == {
3802+
"a",
3803+
"c",
3804+
}
3805+
assert set(ranges[1]["secondaryDim"]["unmaterializedPartitions"]) == {
3806+
"b",
3807+
"d",
3808+
}
36303809

36313810

36323811
def test_concurrency_assets(graphql_context: WorkspaceRequestContext):
@@ -3712,7 +3891,11 @@ def test_asset_materialization_history(self, graphql_context: WorkspaceRequestCo
37123891
result = execute_dagster_graphql(
37133892
graphql_context,
37143893
GET_ASSET_MATERIALIZATION_HISTORY,
3715-
variables={"assetKey": {"path": ["asset_1"]}, "eventTypeSelector": "ALL", "limit": 2},
3894+
variables={
3895+
"assetKey": {"path": ["asset_1"]},
3896+
"eventTypeSelector": "ALL",
3897+
"limit": 2,
3898+
},
37163899
)
37173900

37183901
assert result.data

0 commit comments

Comments
 (0)