Skip to content

Commit 8972aac

Browse files
committed
poc asset ordering
1 parent 5973dc8 commit 8972aac

File tree

4 files changed

+177
-0
lines changed

4 files changed

+177
-0
lines changed

python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py

+9
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,12 @@ class Meta:
638638
description="Retrieve the executions for a given asset check.",
639639
)
640640

641+
orderAssetsByLastMaterializedTime = graphene.Field(
642+
non_null_list(GrapheneAssetKey),
643+
assetKeys=graphene.Argument(non_null_list(GrapheneAssetKeyInput)),
644+
description="Retrieve a list of asset keys sorted by their last materialized or observed time. Includes materialization attempts that failed to materialize the asset.",
645+
)
646+
641647
@capture_error
642648
def resolve_repositoriesOrError(
643649
self,
@@ -1357,3 +1363,6 @@ def resolve_assetCheckExecutions(
13571363
limit=limit,
13581364
cursor=cursor,
13591365
)
1366+
1367+
def resolve_orderAssetsByLastMaterializedTime(self, graphene_info, assetKeys: Sequence[GrapheneAssetKeyInput]) -> Sequence[GrapheneAssetKeyInput]:
1368+

python_modules/dagster/dagster/_core/storage/event_log/base.py

+5
Original file line numberDiff line numberDiff line change
@@ -733,3 +733,8 @@ def get_pool_config(self) -> PoolConfig:
733733
# Base implementation of fetching pool config. To be overriden for remote storage
734734
# implementations where the local instance might not match the remote instance.
735735
return self._instance.get_concurrency_config().pool_config
736+
737+
def order_assets_by_last_materialized_time(
738+
self, asset_keys: Sequence[AssetKey], descending: bool = False
739+
) -> Sequence[AssetKey]:
740+
raise NotImplementedError()

python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py

+48
Original file line numberDiff line numberDiff line change
@@ -3245,6 +3245,54 @@ def get_updated_data_version_partitions(
32453245

32463246
return updated_partitions
32473247

3248+
def order_assets_by_last_materialized_time(
3249+
self, asset_keys: Sequence[AssetKey], descending: bool = False
3250+
) -> Sequence[AssetKey]:
3251+
"""Given a list of asset keys, returns the keys ordered by the time they most recently had a
3252+
materialization-related or observation event (based on the last_materialization_timestamp
3253+
which is updated on planned, materialization, and observation events). Assets that have never
3254+
been materialized or have been wiped since the latest event are considered the oldest.
3255+
3256+
descending=True - newest asset first, never materialized assets last
3257+
descending=False (default) - never materialized assets first, then oldest asset
3258+
"""
3259+
if not self.has_asset_key_index_cols() or not self.has_secondary_index(
3260+
ASSET_KEY_INDEX_COLS
3261+
):
3262+
# punting on these cases for prototype
3263+
raise NotImplementedError()
3264+
3265+
if descending:
3266+
order_by = AssetKeyTable.c.last_materialization_timestamp.desc()
3267+
else:
3268+
order_by = AssetKeyTable.c.last_materialization_timestamp.asc()
3269+
3270+
query = (
3271+
db_select([AssetKeyTable.c.asset_key])
3272+
.order_by(order_by)
3273+
.where(AssetKeyTable.c.asset_key.in_([key.to_string() for key in asset_keys]))
3274+
.where(
3275+
db.or_(
3276+
AssetKeyTable.c.wipe_timestamp.is_(None),
3277+
AssetKeyTable.c.last_materialization_timestamp > AssetKeyTable.c.wipe_timestamp,
3278+
)
3279+
)
3280+
)
3281+
with self.index_connection() as conn:
3282+
rows = db_fetch_mappings(conn, query)
3283+
3284+
ordered_asset_keys = [AssetKey.from_db_string(cast(str, row["asset_key"])) for row in rows]
3285+
3286+
never_materialized_assets = [
3287+
asset_key for asset_key in asset_keys if asset_key not in ordered_asset_keys
3288+
]
3289+
if descending:
3290+
ordered_asset_keys.extend(never_materialized_assets)
3291+
return ordered_asset_keys
3292+
else:
3293+
never_materialized_assets.extend(ordered_asset_keys)
3294+
return never_materialized_assets
3295+
32483296

32493297
def _get_from_row(row: SqlAlchemyRow, column: str) -> object:
32503298
"""Utility function for extracting a column from a sqlalchemy row proxy, since '_asdict' is not

python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py

+115
Original file line numberDiff line numberDiff line change
@@ -6714,3 +6714,118 @@ def test_fetch_failed_materializations(self, test_run_id, storage: EventLogStora
67146714
failed_record.asset_key == asset_key_1
67156715
for failed_record in failed_records_for_partitions.records
67166716
)
6717+
6718+
def test_asset_ordering(self, test_run_id, storage: EventLogStorage, instance):
6719+
# log events for some assets, make sure the time order of the events doesn't match the
6720+
# alpahbetical order of the asset keys
6721+
storage.store_event(
6722+
EventLogEntry(
6723+
error_info=None,
6724+
user_message="",
6725+
level="debug",
6726+
run_id=test_run_id,
6727+
timestamp=1,
6728+
dagster_event=DagsterEvent(
6729+
DagsterEventType.ASSET_MATERIALIZATION.value,
6730+
"nonce",
6731+
event_specific_data=StepMaterializationData(
6732+
materialization=AssetMaterialization(asset_key=AssetKey("asset_4")),
6733+
),
6734+
),
6735+
)
6736+
)
6737+
storage.store_event(
6738+
EventLogEntry(
6739+
error_info=None,
6740+
user_message="",
6741+
level="debug",
6742+
run_id=test_run_id,
6743+
timestamp=2,
6744+
dagster_event=DagsterEvent(
6745+
DagsterEventType.ASSET_MATERIALIZATION.value,
6746+
"nonce",
6747+
event_specific_data=StepMaterializationData(
6748+
materialization=AssetMaterialization(asset_key=AssetKey("asset_1")),
6749+
),
6750+
),
6751+
)
6752+
)
6753+
storage.store_event(
6754+
EventLogEntry(
6755+
error_info=None,
6756+
user_message="",
6757+
level="debug",
6758+
run_id=test_run_id,
6759+
timestamp=2,
6760+
dagster_event=DagsterEvent(
6761+
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
6762+
"nonce",
6763+
event_specific_data=AssetMaterializationPlannedData(
6764+
asset_key=AssetKey("asset_2")
6765+
),
6766+
),
6767+
)
6768+
)
6769+
storage.store_event(
6770+
EventLogEntry(
6771+
error_info=None,
6772+
user_message="",
6773+
level="debug",
6774+
run_id=test_run_id,
6775+
timestamp=3,
6776+
dagster_event=DagsterEvent(
6777+
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
6778+
"nonce",
6779+
event_specific_data=AssetObservationData(
6780+
AssetObservation(asset_key=AssetKey("asset_3"))
6781+
),
6782+
),
6783+
)
6784+
)
6785+
6786+
query_keys = [
6787+
AssetKey("asset_1"),
6788+
AssetKey("asset_2"),
6789+
AssetKey("asset_3"),
6790+
AssetKey("asset_4"),
6791+
AssetKey("asset_5"), # not in DB, so never materialized
6792+
]
6793+
expected_order = [
6794+
AssetKey("asset_5"),
6795+
AssetKey("asset_4"),
6796+
AssetKey("asset_1"),
6797+
AssetKey("asset_2"),
6798+
AssetKey("asset_3"),
6799+
]
6800+
6801+
assert (
6802+
storage.order_assets_by_last_materialized_time(asset_keys=query_keys)
6803+
== expected_order
6804+
)
6805+
assert (
6806+
storage.order_assets_by_last_materialized_time(
6807+
asset_keys=query_keys, descending=True
6808+
)
6809+
== expected_order.reverse()
6810+
)
6811+
6812+
storage.wipe_asset(AssetKey("asset_1"))
6813+
6814+
expected_order = [
6815+
AssetKey("asset_1"),
6816+
AssetKey("asset_5"),
6817+
AssetKey("asset_4"),
6818+
AssetKey("asset_2"),
6819+
AssetKey("asset_3"),
6820+
]
6821+
6822+
assert (
6823+
storage.order_assets_by_last_materialized_time(asset_keys=query_keys)
6824+
== expected_order
6825+
)
6826+
assert (
6827+
storage.order_assets_by_last_materialized_time(
6828+
asset_keys=query_keys, descending=True
6829+
)
6830+
== expected_order.reverse()
6831+
)

0 commit comments

Comments
 (0)