Skip to content

Move pipes external metadata processing to its own layer. #29311

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: airlift_jobs_ui
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
import dagster._check as check
from dagster._annotations import PublicAttr, deprecated, deprecated_param
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.metadata.external_metadata import (
EXTERNAL_METADATA_TYPE_INFER as EXTERNAL_METADATA_TYPE_INFER,
ExternalMetadataType as ExternalMetadataType,
ExternalMetadataValue as ExternalMetadataValue,
metadata_map_from_external as metadata_map_from_external,
metadata_value_from_external as metadata_value_from_external,
)
from dagster._core.definitions.metadata.metadata_set import (
NamespacedMetadataSet as NamespacedMetadataSet,
TableMetadataSet as TableMetadataSet,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
from collections.abc import Mapping, Sequence
from typing import TYPE_CHECKING, Any, Literal, TypedDict, Union

import dagster._check as check
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.metadata.table import (
TableColumn as TableColumn,
TableColumnConstraints as TableColumnConstraints,
TableColumnDep as TableColumnDep,
TableColumnLineage as TableColumnLineage,
TableConstraints as TableConstraints,
TableRecord as TableRecord,
TableSchema as TableSchema,
)

if TYPE_CHECKING:
from dagster._core.definitions.metadata import MetadataValue

InferrableExternalMetadataValue = Union[
int, float, str, Mapping[str, Any], Sequence[Any], bool, None
]


class ExternalMetadataValue(TypedDict):
type: "ExternalMetadataType"
raw_value: InferrableExternalMetadataValue


# Infer the type from the raw value on the orchestration end
EXTERNAL_METADATA_TYPE_INFER = "__infer__"

ExternalMetadataType = Literal[
"__infer__",
"text",
"url",
"path",
"notebook",
"json",
"md",
"float",
"int",
"bool",
"dagster_run",
"asset",
"null",
"table",
"table_schema",
"table_column_lineage",
"timestamp",
]


def metadata_map_from_external(
metadata: Mapping[str, ExternalMetadataValue],
) -> Mapping[str, "MetadataValue"]:
return {k: metadata_value_from_external(v["raw_value"], v["type"]) for k, v in metadata.items()}


def metadata_value_from_external(
value: Any, metadata_type: ExternalMetadataType
) -> "MetadataValue":
from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value

if metadata_type == EXTERNAL_METADATA_TYPE_INFER:
return normalize_metadata_value(value)
elif metadata_type == "text":
return MetadataValue.text(value)
elif metadata_type == "url":
return MetadataValue.url(value)
elif metadata_type == "path":
return MetadataValue.path(value)
elif metadata_type == "notebook":
return MetadataValue.notebook(value)
elif metadata_type == "json":
return MetadataValue.json(value)
elif metadata_type == "md":
return MetadataValue.md(value)
elif metadata_type == "float":
return MetadataValue.float(value)
elif metadata_type == "int":
return MetadataValue.int(value)
elif metadata_type == "bool":
return MetadataValue.bool(value)
elif metadata_type == "dagster_run":
return MetadataValue.dagster_run(value)
elif metadata_type == "asset":
return MetadataValue.asset(AssetKey.from_user_string(value))
elif metadata_type == "table":
value = check.mapping_param(value, "table_value", key_type=str)
return MetadataValue.table(
records=[TableRecord(record) for record in value["records"]],
schema=TableSchema(
columns=[
TableColumn(
name=column["name"],
type=column["type"],
description=column.get("description"),
tags=column.get("tags"),
constraints=TableColumnConstraints(**column["constraints"])
if column.get("constraints")
else None,
)
for column in value["schema"]
]
),
)
elif metadata_type == "table_schema":
value = check.mapping_param(value, "table_schema_value", key_type=str)
return MetadataValue.table_schema(
schema=TableSchema(
columns=[
TableColumn(
name=column["name"],
type=column["type"],
description=column.get("description"),
tags=column.get("tags"),
constraints=TableColumnConstraints(**column["constraints"])
if column.get("constraints")
else None,
)
for column in value["columns"]
]
)
)
elif metadata_type == "table_column_lineage":
value = check.mapping_param(value, "table_column_value", key_type=str)
return MetadataValue.column_lineage(
lineage=TableColumnLineage(
deps_by_column={
column: [TableColumnDep(**dep) for dep in deps]
for column, deps in value["deps_by_column"].items()
}
)
)
elif metadata_type == "timestamp":
return MetadataValue.timestamp(float(check.numeric_param(value, "timestamp")))
elif metadata_type == "null":
return MetadataValue.null()
else:
check.failed(f"Unexpected metadata type {metadata_type}")
105 changes: 8 additions & 97 deletions python_modules/dagster/dagster/_core/pipes/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,12 @@
from dagster_pipes import (
DAGSTER_PIPES_CONTEXT_ENV_VAR,
DAGSTER_PIPES_MESSAGES_ENV_VAR,
PIPES_METADATA_TYPE_INFER,
Method,
PipesContextData,
PipesDataProvenance,
PipesException,
PipesExtras,
PipesMessage,
PipesMetadataType,
PipesMetadataValue,
PipesOpenedData,
PipesParams,
PipesTimeWindow,
Expand All @@ -34,14 +31,10 @@
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
from dagster._core.definitions.data_version import DataProvenance, DataVersion
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value
from dagster._core.definitions.metadata.table import (
TableColumn,
TableColumnConstraints,
TableColumnDep,
TableColumnLineage,
TableRecord,
TableSchema,
from dagster._core.definitions.metadata import (
ExternalMetadataValue,
MetadataValue,
metadata_map_from_external,
)
from dagster._core.definitions.partition_key_range import PartitionKeyRange
from dagster._core.definitions.result import MaterializeResult
Expand Down Expand Up @@ -123,91 +116,9 @@ def received_closed_message(self) -> bool:
return self._received_closed_msg

def _resolve_metadata(
self, metadata: Mapping[str, PipesMetadataValue]
self, metadata: Mapping[str, ExternalMetadataValue]
) -> Mapping[str, MetadataValue]:
return {
k: self._resolve_metadata_value(v["raw_value"], v["type"]) for k, v in metadata.items()
}

@staticmethod
def _resolve_metadata_value(value: Any, metadata_type: PipesMetadataType) -> MetadataValue:
if metadata_type == PIPES_METADATA_TYPE_INFER:
return normalize_metadata_value(value)
elif metadata_type == "text":
return MetadataValue.text(value)
elif metadata_type == "url":
return MetadataValue.url(value)
elif metadata_type == "path":
return MetadataValue.path(value)
elif metadata_type == "notebook":
return MetadataValue.notebook(value)
elif metadata_type == "json":
return MetadataValue.json(value)
elif metadata_type == "md":
return MetadataValue.md(value)
elif metadata_type == "float":
return MetadataValue.float(value)
elif metadata_type == "int":
return MetadataValue.int(value)
elif metadata_type == "bool":
return MetadataValue.bool(value)
elif metadata_type == "dagster_run":
return MetadataValue.dagster_run(value)
elif metadata_type == "asset":
return MetadataValue.asset(AssetKey.from_user_string(value))
elif metadata_type == "table":
value = check.mapping_param(value, "table_value", key_type=str)
return MetadataValue.table(
records=[TableRecord(record) for record in value["records"]],
schema=TableSchema(
columns=[
TableColumn(
name=column["name"],
type=column["type"],
description=column.get("description"),
tags=column.get("tags"),
constraints=TableColumnConstraints(**column["constraints"])
if column.get("constraints")
else None,
)
for column in value["schema"]
]
),
)
elif metadata_type == "table_schema":
value = check.mapping_param(value, "table_schema_value", key_type=str)
return MetadataValue.table_schema(
schema=TableSchema(
columns=[
TableColumn(
name=column["name"],
type=column["type"],
description=column.get("description"),
tags=column.get("tags"),
constraints=TableColumnConstraints(**column["constraints"])
if column.get("constraints")
else None,
)
for column in value["columns"]
]
)
)
elif metadata_type == "table_column_lineage":
value = check.mapping_param(value, "table_column_value", key_type=str)
return MetadataValue.column_lineage(
lineage=TableColumnLineage(
deps_by_column={
column: [TableColumnDep(**dep) for dep in deps]
for column, deps in value["deps_by_column"].items()
}
)
)
elif metadata_type == "timestamp":
return MetadataValue.timestamp(float(check.numeric_param(value, "timestamp")))
elif metadata_type == "null":
return MetadataValue.null()
else:
check.failed(f"Unexpected metadata type {metadata_type}")
return metadata_map_from_external(metadata)

# Type ignores because we currently validate in individual handlers
def handle_message(self, message: PipesMessage) -> None:
Expand Down Expand Up @@ -253,7 +164,7 @@ def _handle_closed(self, params: Optional[Mapping[str, Any]]) -> None:
def _handle_report_asset_materialization(
self,
asset_key: str,
metadata: Optional[Mapping[str, PipesMetadataValue]],
metadata: Optional[Mapping[str, ExternalMetadataValue]],
data_version: Optional[str],
) -> None:
check.str_param(asset_key, "asset_key")
Expand All @@ -275,7 +186,7 @@ def _handle_report_asset_check(
check_name: str,
passed: bool,
severity: str,
metadata: Mapping[str, PipesMetadataValue],
metadata: Mapping[str, ExternalMetadataValue],
) -> None:
check.str_param(asset_key, "asset_key")
check.str_param(check_name, "check_name")
Expand Down