From 55a3c1668cfb1b520e14c7996f3f979168bfb9bc Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Tue, 15 Apr 2025 14:24:54 -0700 Subject: [PATCH] Move pipes external metadata processing to its own layer. --- .../_core/definitions/metadata/__init__.py | 7 + .../definitions/metadata/external_metadata.py | 140 ++++++++++++++++++ .../dagster/dagster/_core/pipes/context.py | 105 +------------ 3 files changed, 155 insertions(+), 97 deletions(-) create mode 100644 python_modules/dagster/dagster/_core/definitions/metadata/external_metadata.py diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py b/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py index d8c2c67fb757e..af2f1ba8959d8 100644 --- a/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py +++ b/python_modules/dagster/dagster/_core/definitions/metadata/__init__.py @@ -15,6 +15,13 @@ import dagster._check as check from dagster._annotations import PublicAttr, deprecated, deprecated_param from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.metadata.external_metadata import ( + EXTERNAL_METADATA_TYPE_INFER as EXTERNAL_METADATA_TYPE_INFER, + ExternalMetadataType as ExternalMetadataType, + ExternalMetadataValue as ExternalMetadataValue, + metadata_map_from_external as metadata_map_from_external, + metadata_value_from_external as metadata_value_from_external, +) from dagster._core.definitions.metadata.metadata_set import ( NamespacedMetadataSet as NamespacedMetadataSet, TableMetadataSet as TableMetadataSet, diff --git a/python_modules/dagster/dagster/_core/definitions/metadata/external_metadata.py b/python_modules/dagster/dagster/_core/definitions/metadata/external_metadata.py new file mode 100644 index 0000000000000..7398d80e1b66c --- /dev/null +++ b/python_modules/dagster/dagster/_core/definitions/metadata/external_metadata.py @@ -0,0 +1,140 @@ +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any, Literal, TypedDict, Union + +import dagster._check as check +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.metadata.table import ( + TableColumn as TableColumn, + TableColumnConstraints as TableColumnConstraints, + TableColumnDep as TableColumnDep, + TableColumnLineage as TableColumnLineage, + TableConstraints as TableConstraints, + TableRecord as TableRecord, + TableSchema as TableSchema, +) + +if TYPE_CHECKING: + from dagster._core.definitions.metadata import MetadataValue + +InferrableExternalMetadataValue = Union[ + int, float, str, Mapping[str, Any], Sequence[Any], bool, None +] + + +class ExternalMetadataValue(TypedDict): + type: "ExternalMetadataType" + raw_value: InferrableExternalMetadataValue + + +# Infer the type from the raw value on the orchestration end +EXTERNAL_METADATA_TYPE_INFER = "__infer__" + +ExternalMetadataType = Literal[ + "__infer__", + "text", + "url", + "path", + "notebook", + "json", + "md", + "float", + "int", + "bool", + "dagster_run", + "asset", + "null", + "table", + "table_schema", + "table_column_lineage", + "timestamp", +] + + +def metadata_map_from_external( + metadata: Mapping[str, ExternalMetadataValue], +) -> Mapping[str, "MetadataValue"]: + return {k: metadata_value_from_external(v["raw_value"], v["type"]) for k, v in metadata.items()} + + +def metadata_value_from_external( + value: Any, metadata_type: ExternalMetadataType +) -> "MetadataValue": + from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value + + if metadata_type == EXTERNAL_METADATA_TYPE_INFER: + return normalize_metadata_value(value) + elif metadata_type == "text": + return MetadataValue.text(value) + elif metadata_type == "url": + return MetadataValue.url(value) + elif metadata_type == "path": + return MetadataValue.path(value) + elif metadata_type == "notebook": + return MetadataValue.notebook(value) + elif metadata_type == "json": + return MetadataValue.json(value) + elif metadata_type == "md": + return MetadataValue.md(value) + elif metadata_type == "float": + return MetadataValue.float(value) + elif metadata_type == "int": + return MetadataValue.int(value) + elif metadata_type == "bool": + return MetadataValue.bool(value) + elif metadata_type == "dagster_run": + return MetadataValue.dagster_run(value) + elif metadata_type == "asset": + return MetadataValue.asset(AssetKey.from_user_string(value)) + elif metadata_type == "table": + value = check.mapping_param(value, "table_value", key_type=str) + return MetadataValue.table( + records=[TableRecord(record) for record in value["records"]], + schema=TableSchema( + columns=[ + TableColumn( + name=column["name"], + type=column["type"], + description=column.get("description"), + tags=column.get("tags"), + constraints=TableColumnConstraints(**column["constraints"]) + if column.get("constraints") + else None, + ) + for column in value["schema"] + ] + ), + ) + elif metadata_type == "table_schema": + value = check.mapping_param(value, "table_schema_value", key_type=str) + return MetadataValue.table_schema( + schema=TableSchema( + columns=[ + TableColumn( + name=column["name"], + type=column["type"], + description=column.get("description"), + tags=column.get("tags"), + constraints=TableColumnConstraints(**column["constraints"]) + if column.get("constraints") + else None, + ) + for column in value["columns"] + ] + ) + ) + elif metadata_type == "table_column_lineage": + value = check.mapping_param(value, "table_column_value", key_type=str) + return MetadataValue.column_lineage( + lineage=TableColumnLineage( + deps_by_column={ + column: [TableColumnDep(**dep) for dep in deps] + for column, deps in value["deps_by_column"].items() + } + ) + ) + elif metadata_type == "timestamp": + return MetadataValue.timestamp(float(check.numeric_param(value, "timestamp"))) + elif metadata_type == "null": + return MetadataValue.null() + else: + check.failed(f"Unexpected metadata type {metadata_type}") diff --git a/python_modules/dagster/dagster/_core/pipes/context.py b/python_modules/dagster/dagster/_core/pipes/context.py index 4782b5b452503..56be91360e41a 100644 --- a/python_modules/dagster/dagster/_core/pipes/context.py +++ b/python_modules/dagster/dagster/_core/pipes/context.py @@ -10,15 +10,12 @@ from dagster_pipes import ( DAGSTER_PIPES_CONTEXT_ENV_VAR, DAGSTER_PIPES_MESSAGES_ENV_VAR, - PIPES_METADATA_TYPE_INFER, Method, PipesContextData, PipesDataProvenance, PipesException, PipesExtras, PipesMessage, - PipesMetadataType, - PipesMetadataValue, PipesOpenedData, PipesParams, PipesTimeWindow, @@ -34,14 +31,10 @@ from dagster._core.definitions.asset_check_spec import AssetCheckSeverity from dagster._core.definitions.data_version import DataProvenance, DataVersion from dagster._core.definitions.events import AssetKey -from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value -from dagster._core.definitions.metadata.table import ( - TableColumn, - TableColumnConstraints, - TableColumnDep, - TableColumnLineage, - TableRecord, - TableSchema, +from dagster._core.definitions.metadata import ( + ExternalMetadataValue, + MetadataValue, + metadata_map_from_external, ) from dagster._core.definitions.partition_key_range import PartitionKeyRange from dagster._core.definitions.result import MaterializeResult @@ -123,91 +116,9 @@ def received_closed_message(self) -> bool: return self._received_closed_msg def _resolve_metadata( - self, metadata: Mapping[str, PipesMetadataValue] + self, metadata: Mapping[str, ExternalMetadataValue] ) -> Mapping[str, MetadataValue]: - return { - k: self._resolve_metadata_value(v["raw_value"], v["type"]) for k, v in metadata.items() - } - - @staticmethod - def _resolve_metadata_value(value: Any, metadata_type: PipesMetadataType) -> MetadataValue: - if metadata_type == PIPES_METADATA_TYPE_INFER: - return normalize_metadata_value(value) - elif metadata_type == "text": - return MetadataValue.text(value) - elif metadata_type == "url": - return MetadataValue.url(value) - elif metadata_type == "path": - return MetadataValue.path(value) - elif metadata_type == "notebook": - return MetadataValue.notebook(value) - elif metadata_type == "json": - return MetadataValue.json(value) - elif metadata_type == "md": - return MetadataValue.md(value) - elif metadata_type == "float": - return MetadataValue.float(value) - elif metadata_type == "int": - return MetadataValue.int(value) - elif metadata_type == "bool": - return MetadataValue.bool(value) - elif metadata_type == "dagster_run": - return MetadataValue.dagster_run(value) - elif metadata_type == "asset": - return MetadataValue.asset(AssetKey.from_user_string(value)) - elif metadata_type == "table": - value = check.mapping_param(value, "table_value", key_type=str) - return MetadataValue.table( - records=[TableRecord(record) for record in value["records"]], - schema=TableSchema( - columns=[ - TableColumn( - name=column["name"], - type=column["type"], - description=column.get("description"), - tags=column.get("tags"), - constraints=TableColumnConstraints(**column["constraints"]) - if column.get("constraints") - else None, - ) - for column in value["schema"] - ] - ), - ) - elif metadata_type == "table_schema": - value = check.mapping_param(value, "table_schema_value", key_type=str) - return MetadataValue.table_schema( - schema=TableSchema( - columns=[ - TableColumn( - name=column["name"], - type=column["type"], - description=column.get("description"), - tags=column.get("tags"), - constraints=TableColumnConstraints(**column["constraints"]) - if column.get("constraints") - else None, - ) - for column in value["columns"] - ] - ) - ) - elif metadata_type == "table_column_lineage": - value = check.mapping_param(value, "table_column_value", key_type=str) - return MetadataValue.column_lineage( - lineage=TableColumnLineage( - deps_by_column={ - column: [TableColumnDep(**dep) for dep in deps] - for column, deps in value["deps_by_column"].items() - } - ) - ) - elif metadata_type == "timestamp": - return MetadataValue.timestamp(float(check.numeric_param(value, "timestamp"))) - elif metadata_type == "null": - return MetadataValue.null() - else: - check.failed(f"Unexpected metadata type {metadata_type}") + return metadata_map_from_external(metadata) # Type ignores because we currently validate in individual handlers def handle_message(self, message: PipesMessage) -> None: @@ -253,7 +164,7 @@ def _handle_closed(self, params: Optional[Mapping[str, Any]]) -> None: def _handle_report_asset_materialization( self, asset_key: str, - metadata: Optional[Mapping[str, PipesMetadataValue]], + metadata: Optional[Mapping[str, ExternalMetadataValue]], data_version: Optional[str], ) -> None: check.str_param(asset_key, "asset_key") @@ -275,7 +186,7 @@ def _handle_report_asset_check( check_name: str, passed: bool, severity: str, - metadata: Mapping[str, PipesMetadataValue], + metadata: Mapping[str, ExternalMetadataValue], ) -> None: check.str_param(asset_key, "asset_key") check.str_param(check_name, "check_name")