Skip to content

Commit 55123a9

Browse files
authored
Make AMP use most recent materialization OR observation for observable assets (#22350)
## Summary & Motivation Fixes https://linear.app/dagster-labs/issue/FOU-219/[schedulingcondition]-fix-handling-of-externalassets-with-the. For observable external assets, AMP is unresponsive to reported asset materializations because it only looks for the most recent asset observation. This PR changes `CachingInstanceQueryer` to look for the most recent materialization or observation for observables, which allows AMP to respond to reported materializations. ## How I Tested These Changes Modified the unit tests for the `NewlyUpdated` conditions to include an observable asset.
1 parent be20eb8 commit 55123a9

File tree

4 files changed

+50
-13
lines changed

4 files changed

+50
-13
lines changed

Diff for: python_modules/dagster/dagster/_utils/caching_instance_queryer.py

+15-5
Original file line numberDiff line numberDiff line change
@@ -265,15 +265,25 @@ def _get_latest_materialization_or_observation_record(
265265
),
266266
before_storage_id=before_cursor,
267267
)
268+
269+
# For observable assets, we fetch the most recent observation and materialization and return
270+
# whichever is more recent. For non-observable assets, we just fetch the most recent
271+
# materialization.
272+
materialization_records = self.instance.fetch_materializations(
273+
records_filter, ascending=False, limit=1
274+
).records
268275
if self.asset_graph.get(asset_partition.asset_key).is_observable:
269-
records = self.instance.fetch_observations(
276+
observation_records = self.instance.fetch_observations(
270277
records_filter, ascending=False, limit=1
271278
).records
279+
all_records = sorted(
280+
[*materialization_records, *observation_records],
281+
key=lambda x: x.timestamp,
282+
reverse=True,
283+
)
272284
else:
273-
records = self.instance.fetch_materializations(
274-
records_filter, ascending=False, limit=1
275-
).records
276-
return next(iter(records), None)
285+
all_records = materialization_records
286+
return next(iter(all_records), None)
277287

278288
@cached_method
279289
def _get_latest_materialization_or_observation_storage_ids_by_asset_partition(

Diff for: python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/asset_condition_tests/test_newly_updated_condition.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
1+
import pytest
12
from dagster import AutomationCondition
23

3-
from ..base_scenario import run_request
4-
from ..scenario_specs import one_asset
4+
from ..scenario_specs import one_asset, one_observable_asset
55
from .asset_condition_scenario import AutomationConditionScenarioState
66

77

8-
def test_newly_updated_condition() -> None:
8+
@pytest.mark.parametrize("scenario", [one_asset, one_observable_asset])
9+
def test_newly_updated_condition(scenario) -> None:
910
state = AutomationConditionScenarioState(
10-
one_asset, automation_condition=AutomationCondition.newly_updated()
11+
scenario, automation_condition=AutomationCondition.newly_updated()
1112
)
1213

1314
# not updated
1415
state, result = state.evaluate("A")
1516
assert result.true_subset.size == 0
1617

1718
# newly updated
18-
state = state.with_runs(run_request("A"))
19+
state = state.with_reported_materialization("A")
1920
state, result = state.evaluate("A")
2021
assert result.true_subset.size == 1
2122

@@ -28,11 +29,11 @@ def test_newly_updated_condition() -> None:
2829
assert result.true_subset.size == 0
2930

3031
# newly updated twice in a row
31-
state = state.with_runs(run_request("A"))
32+
state = state.with_reported_materialization("A")
3233
state, result = state.evaluate("A")
3334
assert result.true_subset.size == 1
3435

35-
state = state.with_runs(run_request("A"))
36+
state = state.with_reported_materialization("A")
3637
state, result = state.evaluate("A")
3738
assert result.true_subset.size == 1
3839

Diff for: python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenario_specs.py

+15
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
import pendulum
44
from dagster import AssetSpec, MultiPartitionKey, StaticPartitionsDefinition
55
from dagster._core.definitions.asset_dep import AssetDep
6+
from dagster._core.definitions.asset_spec import (
7+
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
8+
AssetExecutionType,
9+
)
610
from dagster._core.definitions.multi_dimensional_partitions import MultiPartitionsDefinition
711
from dagster._core.definitions.partition import DynamicPartitionsDefinition
812
from dagster._core.definitions.partition_mapping import StaticPartitionMapping
@@ -40,6 +44,17 @@
4044
##############
4145
one_asset = ScenarioSpec(asset_specs=[AssetSpec("A")])
4246

47+
one_observable_asset = ScenarioSpec(
48+
[
49+
AssetSpec(
50+
"A",
51+
metadata={
52+
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value
53+
},
54+
)
55+
]
56+
)
57+
4358
two_assets_in_sequence = ScenarioSpec(
4459
asset_specs=[AssetSpec("A"), AssetSpec("B", deps=["A"])],
4560
)

Diff for: python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/scenario_state.py

+12-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from dagster._core.definitions import materialize
3030
from dagster._core.definitions.asset_graph import AssetGraph
3131
from dagster._core.definitions.definitions_class import create_repository_using_definitions_args
32-
from dagster._core.definitions.events import CoercibleToAssetKey
32+
from dagster._core.definitions.events import AssetMaterialization, CoercibleToAssetKey
3333
from dagster._core.definitions.executor_definition import in_process_executor
3434
from dagster._core.definitions.repository_definition.repository_definition import (
3535
RepositoryDefinition,
@@ -152,6 +152,7 @@ def _multi_asset(context: AssetExecutionContext):
152152
"auto_materialize_policy",
153153
"freshness_policy",
154154
"partitions_def",
155+
"metadata",
155156
}
156157
assets.append(
157158
asset(
@@ -325,6 +326,16 @@ def test_time_fn() -> float:
325326
),
326327
)
327328

329+
def with_reported_materialization(
330+
self, asset_key: CoercibleToAssetKey, partition_key: Optional[str] = None
331+
) -> Self:
332+
mat = AssetMaterialization(
333+
asset_key=asset_key,
334+
partition=partition_key,
335+
)
336+
self.instance.report_runless_asset_event(mat)
337+
return self
338+
328339
def _with_runs_with_status(self, status: DagsterRunStatus) -> Self:
329340
"""Create new runs that will run to completion for all existing runs with the given status,
330341
and delete the runs with that status from the instance.

0 commit comments

Comments
 (0)