Skip to content

Commit c5a373e

Browse files
committed
Move pipes external metadata processing to its own layer.
1 parent 1c6bdac commit c5a373e

File tree

3 files changed

+155
-97
lines changed

3 files changed

+155
-97
lines changed

python_modules/dagster/dagster/_core/definitions/metadata/__init__.py

+7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515
import dagster._check as check
1616
from dagster._annotations import PublicAttr, deprecated, deprecated_param
1717
from dagster._core.definitions.asset_key import AssetKey
18+
from dagster._core.definitions.metadata.external_metadata import (
19+
EXTERNAL_METADATA_TYPE_INFER as EXTERNAL_METADATA_TYPE_INFER,
20+
ExternalMetadataType as ExternalMetadataType,
21+
ExternalMetadataValue as ExternalMetadataValue,
22+
metadata_map_from_external as metadata_map_from_external,
23+
metadata_value_from_external as metadata_value_from_external,
24+
)
1825
from dagster._core.definitions.metadata.metadata_set import (
1926
NamespacedMetadataSet as NamespacedMetadataSet,
2027
TableMetadataSet as TableMetadataSet,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
from collections.abc import Mapping, Sequence
2+
from typing import TYPE_CHECKING, Any, Literal, TypedDict, Union
3+
4+
import dagster._check as check
5+
from dagster._core.definitions.asset_key import AssetKey
6+
from dagster._core.definitions.metadata.table import (
7+
TableColumn as TableColumn,
8+
TableColumnConstraints as TableColumnConstraints,
9+
TableColumnDep as TableColumnDep,
10+
TableColumnLineage as TableColumnLineage,
11+
TableConstraints as TableConstraints,
12+
TableRecord as TableRecord,
13+
TableSchema as TableSchema,
14+
)
15+
16+
if TYPE_CHECKING:
17+
from dagster._core.definitions.metadata import MetadataValue
18+
19+
InferrableExternalMetadataValue = Union[
20+
int, float, str, Mapping[str, Any], Sequence[Any], bool, None
21+
]
22+
23+
24+
class ExternalMetadataValue(TypedDict):
25+
type: "ExternalMetadataType"
26+
raw_value: InferrableExternalMetadataValue
27+
28+
29+
# Infer the type from the raw value on the orchestration end
30+
EXTERNAL_METADATA_TYPE_INFER = "__infer__"
31+
32+
ExternalMetadataType = Literal[
33+
"__infer__",
34+
"text",
35+
"url",
36+
"path",
37+
"notebook",
38+
"json",
39+
"md",
40+
"float",
41+
"int",
42+
"bool",
43+
"dagster_run",
44+
"asset",
45+
"null",
46+
"table",
47+
"table_schema",
48+
"table_column_lineage",
49+
"timestamp",
50+
]
51+
52+
53+
def metadata_map_from_external(
54+
metadata: Mapping[str, ExternalMetadataValue],
55+
) -> Mapping[str, "MetadataValue"]:
56+
return {k: metadata_value_from_external(v["raw_value"], v["type"]) for k, v in metadata.items()}
57+
58+
59+
def metadata_value_from_external(
60+
value: Any, metadata_type: ExternalMetadataType
61+
) -> "MetadataValue":
62+
from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value
63+
64+
if metadata_type == EXTERNAL_METADATA_TYPE_INFER:
65+
return normalize_metadata_value(value)
66+
elif metadata_type == "text":
67+
return MetadataValue.text(value)
68+
elif metadata_type == "url":
69+
return MetadataValue.url(value)
70+
elif metadata_type == "path":
71+
return MetadataValue.path(value)
72+
elif metadata_type == "notebook":
73+
return MetadataValue.notebook(value)
74+
elif metadata_type == "json":
75+
return MetadataValue.json(value)
76+
elif metadata_type == "md":
77+
return MetadataValue.md(value)
78+
elif metadata_type == "float":
79+
return MetadataValue.float(value)
80+
elif metadata_type == "int":
81+
return MetadataValue.int(value)
82+
elif metadata_type == "bool":
83+
return MetadataValue.bool(value)
84+
elif metadata_type == "dagster_run":
85+
return MetadataValue.dagster_run(value)
86+
elif metadata_type == "asset":
87+
return MetadataValue.asset(AssetKey.from_user_string(value))
88+
elif metadata_type == "table":
89+
value = check.mapping_param(value, "table_value", key_type=str)
90+
return MetadataValue.table(
91+
records=[TableRecord(record) for record in value["records"]],
92+
schema=TableSchema(
93+
columns=[
94+
TableColumn(
95+
name=column["name"],
96+
type=column["type"],
97+
description=column.get("description"),
98+
tags=column.get("tags"),
99+
constraints=TableColumnConstraints(**column["constraints"])
100+
if column.get("constraints")
101+
else None,
102+
)
103+
for column in value["schema"]
104+
]
105+
),
106+
)
107+
elif metadata_type == "table_schema":
108+
value = check.mapping_param(value, "table_schema_value", key_type=str)
109+
return MetadataValue.table_schema(
110+
schema=TableSchema(
111+
columns=[
112+
TableColumn(
113+
name=column["name"],
114+
type=column["type"],
115+
description=column.get("description"),
116+
tags=column.get("tags"),
117+
constraints=TableColumnConstraints(**column["constraints"])
118+
if column.get("constraints")
119+
else None,
120+
)
121+
for column in value["columns"]
122+
]
123+
)
124+
)
125+
elif metadata_type == "table_column_lineage":
126+
value = check.mapping_param(value, "table_column_value", key_type=str)
127+
return MetadataValue.column_lineage(
128+
lineage=TableColumnLineage(
129+
deps_by_column={
130+
column: [TableColumnDep(**dep) for dep in deps]
131+
for column, deps in value["deps_by_column"].items()
132+
}
133+
)
134+
)
135+
elif metadata_type == "timestamp":
136+
return MetadataValue.timestamp(float(check.numeric_param(value, "timestamp")))
137+
elif metadata_type == "null":
138+
return MetadataValue.null()
139+
else:
140+
check.failed(f"Unexpected metadata type {metadata_type}")

python_modules/dagster/dagster/_core/pipes/context.py

+8-97
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,12 @@
1010
from dagster_pipes import (
1111
DAGSTER_PIPES_CONTEXT_ENV_VAR,
1212
DAGSTER_PIPES_MESSAGES_ENV_VAR,
13-
PIPES_METADATA_TYPE_INFER,
1413
Method,
1514
PipesContextData,
1615
PipesDataProvenance,
1716
PipesException,
1817
PipesExtras,
1918
PipesMessage,
20-
PipesMetadataType,
21-
PipesMetadataValue,
2219
PipesOpenedData,
2320
PipesParams,
2421
PipesTimeWindow,
@@ -34,14 +31,10 @@
3431
from dagster._core.definitions.asset_check_spec import AssetCheckSeverity
3532
from dagster._core.definitions.data_version import DataProvenance, DataVersion
3633
from dagster._core.definitions.events import AssetKey
37-
from dagster._core.definitions.metadata import MetadataValue, normalize_metadata_value
38-
from dagster._core.definitions.metadata.table import (
39-
TableColumn,
40-
TableColumnConstraints,
41-
TableColumnDep,
42-
TableColumnLineage,
43-
TableRecord,
44-
TableSchema,
34+
from dagster._core.definitions.metadata import (
35+
ExternalMetadataValue,
36+
MetadataValue,
37+
metadata_map_from_external,
4538
)
4639
from dagster._core.definitions.partition_key_range import PartitionKeyRange
4740
from dagster._core.definitions.result import MaterializeResult
@@ -123,91 +116,9 @@ def received_closed_message(self) -> bool:
123116
return self._received_closed_msg
124117

125118
def _resolve_metadata(
126-
self, metadata: Mapping[str, PipesMetadataValue]
119+
self, metadata: Mapping[str, ExternalMetadataValue]
127120
) -> Mapping[str, MetadataValue]:
128-
return {
129-
k: self._resolve_metadata_value(v["raw_value"], v["type"]) for k, v in metadata.items()
130-
}
131-
132-
@staticmethod
133-
def _resolve_metadata_value(value: Any, metadata_type: PipesMetadataType) -> MetadataValue:
134-
if metadata_type == PIPES_METADATA_TYPE_INFER:
135-
return normalize_metadata_value(value)
136-
elif metadata_type == "text":
137-
return MetadataValue.text(value)
138-
elif metadata_type == "url":
139-
return MetadataValue.url(value)
140-
elif metadata_type == "path":
141-
return MetadataValue.path(value)
142-
elif metadata_type == "notebook":
143-
return MetadataValue.notebook(value)
144-
elif metadata_type == "json":
145-
return MetadataValue.json(value)
146-
elif metadata_type == "md":
147-
return MetadataValue.md(value)
148-
elif metadata_type == "float":
149-
return MetadataValue.float(value)
150-
elif metadata_type == "int":
151-
return MetadataValue.int(value)
152-
elif metadata_type == "bool":
153-
return MetadataValue.bool(value)
154-
elif metadata_type == "dagster_run":
155-
return MetadataValue.dagster_run(value)
156-
elif metadata_type == "asset":
157-
return MetadataValue.asset(AssetKey.from_user_string(value))
158-
elif metadata_type == "table":
159-
value = check.mapping_param(value, "table_value", key_type=str)
160-
return MetadataValue.table(
161-
records=[TableRecord(record) for record in value["records"]],
162-
schema=TableSchema(
163-
columns=[
164-
TableColumn(
165-
name=column["name"],
166-
type=column["type"],
167-
description=column.get("description"),
168-
tags=column.get("tags"),
169-
constraints=TableColumnConstraints(**column["constraints"])
170-
if column.get("constraints")
171-
else None,
172-
)
173-
for column in value["schema"]
174-
]
175-
),
176-
)
177-
elif metadata_type == "table_schema":
178-
value = check.mapping_param(value, "table_schema_value", key_type=str)
179-
return MetadataValue.table_schema(
180-
schema=TableSchema(
181-
columns=[
182-
TableColumn(
183-
name=column["name"],
184-
type=column["type"],
185-
description=column.get("description"),
186-
tags=column.get("tags"),
187-
constraints=TableColumnConstraints(**column["constraints"])
188-
if column.get("constraints")
189-
else None,
190-
)
191-
for column in value["columns"]
192-
]
193-
)
194-
)
195-
elif metadata_type == "table_column_lineage":
196-
value = check.mapping_param(value, "table_column_value", key_type=str)
197-
return MetadataValue.column_lineage(
198-
lineage=TableColumnLineage(
199-
deps_by_column={
200-
column: [TableColumnDep(**dep) for dep in deps]
201-
for column, deps in value["deps_by_column"].items()
202-
}
203-
)
204-
)
205-
elif metadata_type == "timestamp":
206-
return MetadataValue.timestamp(float(check.numeric_param(value, "timestamp")))
207-
elif metadata_type == "null":
208-
return MetadataValue.null()
209-
else:
210-
check.failed(f"Unexpected metadata type {metadata_type}")
121+
return metadata_map_from_external(metadata)
211122

212123
# Type ignores because we currently validate in individual handlers
213124
def handle_message(self, message: PipesMessage) -> None:
@@ -253,7 +164,7 @@ def _handle_closed(self, params: Optional[Mapping[str, Any]]) -> None:
253164
def _handle_report_asset_materialization(
254165
self,
255166
asset_key: str,
256-
metadata: Optional[Mapping[str, PipesMetadataValue]],
167+
metadata: Optional[Mapping[str, ExternalMetadataValue]],
257168
data_version: Optional[str],
258169
) -> None:
259170
check.str_param(asset_key, "asset_key")
@@ -275,7 +186,7 @@ def _handle_report_asset_check(
275186
check_name: str,
276187
passed: bool,
277188
severity: str,
278-
metadata: Mapping[str, PipesMetadataValue],
189+
metadata: Mapping[str, ExternalMetadataValue],
279190
) -> None:
280191
check.str_param(asset_key, "asset_key")
281192
check.str_param(check_name, "check_name")

0 commit comments

Comments
 (0)