24
24
AssetMaterializationFailure ,
25
25
AssetMaterializationFailureReason ,
26
26
AssetMaterializationFailureType ,
27
+ AssetObservation ,
27
28
)
28
29
from dagster ._core .definitions .multi_dimensional_partitions import MultiPartitionKey
29
- from dagster ._core .events import StepMaterializationData
30
+ from dagster ._core .events import (
31
+ AssetMaterializationPlannedData ,
32
+ AssetObservationData ,
33
+ StepMaterializationData ,
34
+ )
30
35
from dagster ._core .events .log import EventLogEntry
31
36
from dagster ._core .storage .dagster_run import DagsterRunStatus
32
37
from dagster ._core .test_utils import instance_for_test , poll_for_finished_run
896
901
}
897
902
"""
898
903
904
+ GET_ASSET_SORT_KEY = """
905
+ query AssetSortKeyQuery($prefix: [String!]!) {
906
+ assetsOrError(prefix: $prefix) {
907
+ ... on AssetConnection {
908
+ nodes {
909
+ key {
910
+ path
911
+ }
912
+ latestEventSortKey
913
+ }
914
+ }
915
+ ... on PythonError {
916
+ message
917
+ }
918
+ }
919
+ }
920
+ """
921
+
899
922
900
923
def _create_run (
901
924
graphql_context : WorkspaceRequestContext ,
@@ -1255,7 +1278,10 @@ def test_asset_asof_timestamp(self, graphql_context: WorkspaceRequestContext):
1255
1278
result = execute_dagster_graphql (
1256
1279
graphql_context ,
1257
1280
GET_ASSET_MATERIALIZATION_AFTER_TIMESTAMP ,
1258
- variables = {"assetKey" : {"path" : ["a" ]}, "afterTimestamp" : str (after_timestamp )},
1281
+ variables = {
1282
+ "assetKey" : {"path" : ["a" ]},
1283
+ "afterTimestamp" : str (after_timestamp ),
1284
+ },
1259
1285
)
1260
1286
assert result .data
1261
1287
assert result .data ["assetOrError" ]
@@ -1268,7 +1294,10 @@ def test_asset_asof_timestamp(self, graphql_context: WorkspaceRequestContext):
1268
1294
result = execute_dagster_graphql (
1269
1295
graphql_context ,
1270
1296
GET_ASSET_MATERIALIZATION_AFTER_TIMESTAMP ,
1271
- variables = {"assetKey" : {"path" : ["a" ]}, "afterTimestamp" : str (after_timestamp )},
1297
+ variables = {
1298
+ "assetKey" : {"path" : ["a" ]},
1299
+ "afterTimestamp" : str (after_timestamp ),
1300
+ },
1272
1301
)
1273
1302
assert result .data
1274
1303
assert result .data ["assetOrError" ]
@@ -2758,7 +2787,11 @@ def test_automation_condition(self, graphql_context: WorkspaceRequestContext):
2758
2787
assert len (custom_automation_condition_asset ) == 1
2759
2788
condition = custom_automation_condition_asset [0 ]["automationCondition" ]
2760
2789
assert condition ["label" ] is None
2761
- assert condition ["expandedLabel" ] == ["(some_custom_name)" , "SINCE" , "(handled)" ]
2790
+ assert condition ["expandedLabel" ] == [
2791
+ "(some_custom_name)" ,
2792
+ "SINCE" ,
2793
+ "(handled)" ,
2794
+ ]
2762
2795
2763
2796
def test_tags (self , graphql_context : WorkspaceRequestContext ):
2764
2797
result = execute_dagster_graphql (
@@ -2824,7 +2857,12 @@ def test_has_asset_checks(self, graphql_context: WorkspaceRequestContext):
2824
2857
assert result .data ["assetNodes" ]
2825
2858
2826
2859
for a in result .data ["assetNodes" ]:
2827
- if a ["assetKey" ]["path" ] in [["asset_1" ], ["one" ], ["check_in_op_asset" ], ["asset_3" ]]:
2860
+ if a ["assetKey" ]["path" ] in [
2861
+ ["asset_1" ],
2862
+ ["one" ],
2863
+ ["check_in_op_asset" ],
2864
+ ["asset_3" ],
2865
+ ]:
2828
2866
assert a ["hasAssetChecks" ] is True
2829
2867
else :
2830
2868
assert a ["hasAssetChecks" ] is False , f"Asset { a ['assetKey' ]} has asset checks"
@@ -2930,6 +2968,135 @@ def test_get_partition_mapping(self, graphql_context: WorkspaceRequestContext):
2930
2968
== "Maps a downstream partition to any upstream partition with an overlapping time window."
2931
2969
)
2932
2970
2971
+ def test_asset_event_sort_key (self , graphql_context : WorkspaceRequestContext ):
2972
+ """Tests that the asset event sort key is correct, and based on the latest event for an asset
2973
+ when querying for asset events.
2974
+ """
2975
+ # TestAssetAwareEventLog::test_asset_event_sort_key[sqlite_with_default_run_launcher_managed_grpc_env]
2976
+ # log events for some assets, make sure the time order of the events doesn't match the
2977
+ # alpahbetical order of the asset keys so that the default alphabetical sorting doesnt
2978
+ asset_prefix = "grouping_prefix"
2979
+ query_keys = [
2980
+ AssetKey ([asset_prefix , "asset_with_prefix_1" ]),
2981
+ AssetKey ([asset_prefix , "asset_with_prefix_2" ]),
2982
+ AssetKey ([asset_prefix , "asset_with_prefix_3" ]),
2983
+ AssetKey ([asset_prefix , "asset_with_prefix_4" ]),
2984
+ AssetKey ([asset_prefix , "asset_with_prefix_5" ]),
2985
+ ]
2986
+ storage = graphql_context .instance .event_log_storage
2987
+ test_run_id = make_new_run_id ()
2988
+ storage .store_event (
2989
+ EventLogEntry (
2990
+ error_info = None ,
2991
+ user_message = "" ,
2992
+ level = "debug" ,
2993
+ run_id = test_run_id ,
2994
+ timestamp = 1.0 ,
2995
+ dagster_event = DagsterEvent (
2996
+ DagsterEventType .ASSET_MATERIALIZATION .value ,
2997
+ "nonce" ,
2998
+ event_specific_data = StepMaterializationData (
2999
+ materialization = AssetMaterialization (asset_key = query_keys [3 ]),
3000
+ ),
3001
+ ),
3002
+ )
3003
+ )
3004
+ storage .store_event (
3005
+ EventLogEntry (
3006
+ error_info = None ,
3007
+ user_message = "" ,
3008
+ level = "debug" ,
3009
+ run_id = test_run_id ,
3010
+ timestamp = 2.0 ,
3011
+ dagster_event = DagsterEvent (
3012
+ DagsterEventType .ASSET_MATERIALIZATION .value ,
3013
+ "nonce" ,
3014
+ event_specific_data = StepMaterializationData (
3015
+ materialization = AssetMaterialization (asset_key = query_keys [0 ]),
3016
+ ),
3017
+ ),
3018
+ )
3019
+ )
3020
+ storage .store_event (
3021
+ EventLogEntry (
3022
+ error_info = None ,
3023
+ user_message = "" ,
3024
+ level = "debug" ,
3025
+ run_id = test_run_id ,
3026
+ timestamp = 3.0 ,
3027
+ dagster_event = DagsterEvent (
3028
+ DagsterEventType .ASSET_MATERIALIZATION_PLANNED .value ,
3029
+ "nonce" ,
3030
+ event_specific_data = AssetMaterializationPlannedData (asset_key = query_keys [1 ]),
3031
+ ),
3032
+ )
3033
+ )
3034
+ storage .store_event (
3035
+ EventLogEntry (
3036
+ error_info = None ,
3037
+ user_message = "" ,
3038
+ level = "debug" ,
3039
+ run_id = test_run_id ,
3040
+ timestamp = 4.0 ,
3041
+ dagster_event = DagsterEvent (
3042
+ DagsterEventType .ASSET_OBSERVATION .value ,
3043
+ "nonce" ,
3044
+ event_specific_data = AssetObservationData (
3045
+ AssetObservation (asset_key = query_keys [2 ])
3046
+ ),
3047
+ ),
3048
+ )
3049
+ )
3050
+ storage .store_event (
3051
+ EventLogEntry (
3052
+ error_info = None ,
3053
+ user_message = "" ,
3054
+ level = "debug" ,
3055
+ run_id = test_run_id ,
3056
+ timestamp = 5.0 ,
3057
+ dagster_event = DagsterEvent .build_asset_failed_to_materialize_event (
3058
+ DagsterEventType .ASSET_FAILED_TO_MATERIALIZE .value ,
3059
+ "nonce" ,
3060
+ asset_materialization_failure = AssetMaterializationFailure (
3061
+ asset_key = query_keys [4 ],
3062
+ failure_type = AssetMaterializationFailureType .MATERIALIZATION ,
3063
+ reason = AssetMaterializationFailureReason .FAILED_TO_MATERIALIZE ,
3064
+ ),
3065
+ ),
3066
+ )
3067
+ )
3068
+ if storage .asset_records_have_last_planned_and_failed_materializations :
3069
+ expected_order = {
3070
+ query_keys [0 ]: 2 ,
3071
+ query_keys [1 ]: 3 ,
3072
+ query_keys [2 ]: 4 ,
3073
+ query_keys [3 ]: 1 ,
3074
+ query_keys [4 ]: 5 ,
3075
+ }
3076
+ else :
3077
+ expected_order = {
3078
+ query_keys [0 ]: 2 ,
3079
+ query_keys [1 ]: None ,
3080
+ query_keys [2 ]: None ,
3081
+ query_keys [3 ]: 1 ,
3082
+ query_keys [4 ]: None ,
3083
+ }
3084
+
3085
+ result = execute_dagster_graphql (
3086
+ graphql_context ,
3087
+ GET_ASSET_SORT_KEY ,
3088
+ variables = {
3089
+ "prefix" : [asset_prefix ],
3090
+ },
3091
+ )
3092
+ assert result
3093
+ assert len (result .data ["assetsOrError" ]["nodes" ]) == 4
3094
+
3095
+ for list_item in result .data ["assetsOrError" ]["nodes" ]:
3096
+ asset_key = AssetKey .from_graphql_input (list_item ["key" ])
3097
+ assert asset_key in expected_order .keys ()
3098
+ assert list_item ["latestEventSortKey" ] == expected_order [asset_key ]
3099
+
2933
3100
2934
3101
# This is factored out of TestAssetAwareEventLog because there is a separate implementation for plus
2935
3102
# graphql tests.
@@ -3619,14 +3786,26 @@ def test_2d_subset_backcompat():
3619
3786
3620
3787
assert ranges [0 ]["primaryDimStartKey" ] == "2022-03-03"
3621
3788
assert ranges [0 ]["primaryDimEndKey" ] == "2022-03-04"
3622
- assert set (ranges [0 ]["secondaryDim" ]["materializedPartitions" ]) == {"a" , "c" }
3623
- assert set (ranges [0 ]["secondaryDim" ]["unmaterializedPartitions" ]) == {"b" , "d" }
3789
+ assert set (ranges [0 ]["secondaryDim" ]["materializedPartitions" ]) == {
3790
+ "a" ,
3791
+ "c" ,
3792
+ }
3793
+ assert set (ranges [0 ]["secondaryDim" ]["unmaterializedPartitions" ]) == {
3794
+ "b" ,
3795
+ "d" ,
3796
+ }
3624
3797
3625
3798
assert ranges [1 ]["primaryDimStartKey" ] == "2022-03-06"
3626
3799
assert ranges [1 ]["primaryDimEndKey" ] == "2022-03-06"
3627
3800
assert len (ranges [1 ]["secondaryDim" ]["materializedPartitions" ]) == 2
3628
- assert set (ranges [1 ]["secondaryDim" ]["materializedPartitions" ]) == {"a" , "c" }
3629
- assert set (ranges [1 ]["secondaryDim" ]["unmaterializedPartitions" ]) == {"b" , "d" }
3801
+ assert set (ranges [1 ]["secondaryDim" ]["materializedPartitions" ]) == {
3802
+ "a" ,
3803
+ "c" ,
3804
+ }
3805
+ assert set (ranges [1 ]["secondaryDim" ]["unmaterializedPartitions" ]) == {
3806
+ "b" ,
3807
+ "d" ,
3808
+ }
3630
3809
3631
3810
3632
3811
def test_concurrency_assets (graphql_context : WorkspaceRequestContext ):
@@ -3712,7 +3891,11 @@ def test_asset_materialization_history(self, graphql_context: WorkspaceRequestCo
3712
3891
result = execute_dagster_graphql (
3713
3892
graphql_context ,
3714
3893
GET_ASSET_MATERIALIZATION_HISTORY ,
3715
- variables = {"assetKey" : {"path" : ["asset_1" ]}, "eventTypeSelector" : "ALL" , "limit" : 2 },
3894
+ variables = {
3895
+ "assetKey" : {"path" : ["asset_1" ]},
3896
+ "eventTypeSelector" : "ALL" ,
3897
+ "limit" : 2 ,
3898
+ },
3716
3899
)
3717
3900
3718
3901
assert result .data
0 commit comments