Skip to content

storage endpoint to order assets by materialization timestamp #29173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -733,3 +733,8 @@ def get_pool_config(self) -> PoolConfig:
# Base implementation of fetching pool config. To be overriden for remote storage
# implementations where the local instance might not match the remote instance.
return self._instance.get_concurrency_config().pool_config

def order_assets_by_last_materialized_time(
self, asset_keys: Optional[Sequence[AssetKey]], descending: bool = False
) -> Sequence[AssetKey]:
raise NotImplementedError()
Original file line number Diff line number Diff line change
Expand Up @@ -3245,6 +3245,68 @@ def get_updated_data_version_partitions(

return updated_partitions

def order_assets_by_last_materialized_time(
self, asset_keys: Optional[Sequence[AssetKey]], descending: bool = False
) -> Sequence[AssetKey]:
"""Given a list of asset keys, returns the keys ordered by the time they most recently had a
materialization-related or observation event (based on the last_materialization_timestamp
which is updated on planned, materialization, and observation events). Assets that have never
been materialized or have been wiped since the latest event are considered the oldest.

If no list of asset keys is provided, all asset keys in the DB are returned in sorted order.
This means that if an asset is not in the DB (ie if it has never been materialized) it will not
be returned.

descending=True - newest asset first, never materialized assets last
descending=False (default) - never materialized assets first, then oldest asset
"""
if not self.has_asset_key_index_cols() or not self.has_secondary_index(
ASSET_KEY_INDEX_COLS
):
# punting on these cases for prototype
raise NotImplementedError()

if descending:
order_by = AssetKeyTable.c.last_materialization_timestamp.desc()
else:
order_by = AssetKeyTable.c.last_materialization_timestamp.asc()

query = (
db_select([AssetKeyTable.c.asset_key])
.order_by(order_by)
Copy link
Member

@gibsondan gibsondan Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the key thing is that doing this order by on the backend/in postgres only makes sense if postgres knows more about how to order it or paginate it efficiently than the frontend or caller does, which isn't the case here since we don't have an index

.where(
db.or_(
AssetKeyTable.c.wipe_timestamp.is_(None),
AssetKeyTable.c.last_materialization_timestamp > AssetKeyTable.c.wipe_timestamp,
)
)
)
if asset_keys is not None:
query = query.where(
AssetKeyTable.c.asset_key.in_([key.to_string() for key in asset_keys])
)

with self.index_connection() as conn:
rows = db_fetch_mappings(conn, query)

ordered_asset_keys = [AssetKey.from_db_string(cast(str, row["asset_key"])) for row in rows]
ordered_asset_keys = [
asset_key for asset_key in ordered_asset_keys if asset_key is not None
]

if asset_keys is None:
return ordered_asset_keys

never_materialized_assets = [
asset_key for asset_key in asset_keys if asset_key not in ordered_asset_keys
]
if descending:
ordered_asset_keys.extend(never_materialized_assets)
return ordered_asset_keys
else:
never_materialized_assets.extend(ordered_asset_keys)
return never_materialized_assets


def _get_from_row(row: SqlAlchemyRow, column: str) -> object:
"""Utility function for extracting a column from a sqlalchemy row proxy, since '_asdict' is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6714,3 +6714,134 @@ def test_fetch_failed_materializations(self, test_run_id, storage: EventLogStora
failed_record.asset_key == asset_key_1
for failed_record in failed_records_for_partitions.records
)

def test_asset_ordering(self, test_run_id, storage: EventLogStorage, instance):
# log events for some assets, make sure the time order of the events doesn't match the
# alpahbetical order of the asset keys
storage.store_event(
EventLogEntry(
error_info=None,
user_message="",
level="debug",
run_id=test_run_id,
timestamp=1,
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION.value,
"nonce",
event_specific_data=StepMaterializationData(
materialization=AssetMaterialization(asset_key=AssetKey("asset_4")),
),
),
)
)
storage.store_event(
EventLogEntry(
error_info=None,
user_message="",
level="debug",
run_id=test_run_id,
timestamp=2,
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION.value,
"nonce",
event_specific_data=StepMaterializationData(
materialization=AssetMaterialization(asset_key=AssetKey("asset_1")),
),
),
)
)
storage.store_event(
EventLogEntry(
error_info=None,
user_message="",
level="debug",
run_id=test_run_id,
timestamp=2,
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetMaterializationPlannedData(
asset_key=AssetKey("asset_2")
),
),
)
)
storage.store_event(
EventLogEntry(
error_info=None,
user_message="",
level="debug",
run_id=test_run_id,
timestamp=3,
dagster_event=DagsterEvent(
DagsterEventType.ASSET_MATERIALIZATION_PLANNED.value,
"nonce",
event_specific_data=AssetObservationData(
AssetObservation(asset_key=AssetKey("asset_3"))
),
),
)
)

query_keys = [
AssetKey("asset_1"),
AssetKey("asset_2"),
AssetKey("asset_3"),
AssetKey("asset_4"),
AssetKey("asset_5"), # not in DB, so never materialized
]
expected_order = [
AssetKey("asset_5"),
AssetKey("asset_4"),
AssetKey("asset_1"),
AssetKey("asset_2"),
AssetKey("asset_3"),
]

assert (
storage.order_assets_by_last_materialized_time(asset_keys=query_keys)
== expected_order
)
assert (
storage.order_assets_by_last_materialized_time(
asset_keys=query_keys, descending=True
)
== expected_order.reverse()
)

storage.wipe_asset(AssetKey("asset_1"))

expected_order = [
AssetKey("asset_1"),
AssetKey("asset_5"),
AssetKey("asset_4"),
AssetKey("asset_2"),
AssetKey("asset_3"),
]

assert (
storage.order_assets_by_last_materialized_time(asset_keys=query_keys)
== expected_order
)
assert (
storage.order_assets_by_last_materialized_time(
asset_keys=query_keys, descending=True
)
== expected_order.reverse()
)

# query for all assets. Since asset_5 has never been materialized, and asset_1 has
# been wiped, they will not be returned

expected_order = [
AssetKey("asset_4"),
AssetKey("asset_2"),
AssetKey("asset_3"),
]

assert storage.order_assets_by_last_materialized_time() == expected_order

assert (
storage.order_assets_by_last_materialized_time(descending=True)
== expected_order.reverse()
)