Asset dependencies across code locations #20327
-
|
What is the current canonical way to do this? Say I have a code location with an asset like this: and another code location with an asset B that is dependent on asset A The docs used to recommend using SourceAssets for this (before that I think it was asset sensors?), but references to SourceAssets seem to have been quietly removed from the docs. Is the new approach to use External Assets? Something else? Any ideas are very welcome! |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 6 replies
-
|
I'm also interested in this! I have a small number of assets which require a different Python environment so I would like to separate them by code location. When I try to specify A as the input to B I get an error: For now I have found a workaround by accessing the DagsterInstance and getting the latest materialisation event for asset A in the op of asset B. It's slightly complicated by the fact that my upstream asset is partitioned. Here's what it looks like: import pandas as pd
from typing import Any
from dagster import (
AssetExecutionContext,
AssetMaterialization,
DagsterInstance,
AssetKey,
DagsterEventType,
EventRecordsFilter,
Output,
SourceAsset,
asset,
)
def get_metadata_per_partition(instance: DagsterInstance, asset_key: AssetKey) -> dict[str, Any]:
ef = EventRecordsFilter(asset_key=asset_key, event_type=DagsterEventType.ASSET_MATERIALIZATION)
mat_events = instance.get_event_records(event_records_filter=ef)
all_materializations: dict[str, list[Any]] = {}
for event in mat_events:
mat = event.event_log_entry.dagster_event.event_specific_data.materialization
partitions = mat.partition
if partitions not in all_materializations:
all_materializations[partitions] = []
all_materializations[partitions].append(event)
# Only keep the latest materialization for each partition
latest_partitions: dict[str, dict] = {}
for partition, materializations in all_materializations.items():
latest: AssetMaterialization = max(materializations, key=lambda x: x.timestamp)
metadata = latest.event_log_entry.dagster_event.event_specific_data.materialization.metadata
latest_partitions[partition] = {
k: v.value for k, v in metadata.items()
}
return latest_partitions
upstream_asset_key = AssetKey(["pre", "fix", "A"])
@asset(
code_version="1",
deps=[SourceAsset(key=upstream_asset_key)],
name='B',
key_prefix=["pre", "fix"],
)
def node_embeddings(context: AssetExecutionContext) -> Output:
# Note: Dependencies across code locations seem to be not well-supported - this is a workaround to get the
# file paths of the upstream asset
upstream_inputs = get_metadata_per_partition(
context.instance, upstream_asset_key
)
nodes = pd.read_parquet(upstream_inputs["node"]["file_path"])
# ... do something with the nodes
It's not exactly elegant, but it seems to work for me so far. |
Beta Was this translation helpful? Give feedback.
-
What made you say this? This section describes this case: https://docs.dagster.io/concepts/assets/software-defined-assets#defining-external-asset-dependencies |
Beta Was this translation helpful? Give feedback.
What made you say this? This section describes this case:
https://docs.dagster.io/concepts/assets/software-defined-assets#defining-external-asset-dependencies