Skip to content

Commit eafdf14

Browse files
feat(ingest/dbt): add add_structured_property operation to meta_mapping (#17503)
1 parent f5f18a1 commit eafdf14

7 files changed

Lines changed: 1035 additions & 26 deletions

File tree

metadata-ingestion/docs/sources/dbt/dbt_post.md

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ meta_mapping:
4848
config:
4949
link: {{ $match }}
5050
description: "Documentation Link"
51+
business_domain:
52+
match: ".*"
53+
operation: "add_domain"
54+
config:
55+
domain: "{{ $match }}"
56+
data_load_frequency:
57+
match: ".*"
58+
operation: "add_structured_property"
59+
config:
60+
structured_property_urn: "urn:li:structuredProperty:io.acme.data_load_frequency"
5161
column_meta_mapping:
5262
terms_list:
5363
match: ".*"
@@ -64,6 +74,11 @@ column_meta_mapping:
6474
operation: "add_tag"
6575
config:
6676
tag: "pii"
77+
classification:
78+
match: ".*"
79+
operation: "add_structured_property"
80+
config:
81+
structured_property_urn: "urn:li:structuredProperty:io.acme.classification"
6782
```
6883
6984
We support the following operations:
@@ -77,10 +92,12 @@ We support the following operations:
7792
- You can use commas to specify multiple owners - e.g. `business_owner: "jane,john,urn:li:corpGroup:data-team"`.
7893
7994
5. add_doc_link - Requires `link` and `description` properties in config. Upon ingestion run, this will overwrite current links in the institutional knowledge section with this new link. The anchor text is defined here in the meta_mappings as `description`.
95+
6. add_domain - Adds the dataset to a DataHub Domain. The `domain` config value can be a short ID (e.g. `Marketing`) or a fully-qualified URN (`urn:li:domain:Marketing`). Supports `{{ $match }}` substitution.
96+
7. add_structured_property - Assigns a value to a [DataHub Structured Property](../../../../docs/features/feature-guides/properties/overview.md). Required config: `structured_property_urn` (full URN or qualified name). Optional config: `value` (literal or `{{ $match }}` template; defaults to the raw meta value, preserving numeric types) and `value_type` (`string` or `number`). Multiple rules targeting the same property URN have their values aggregated into a single assignment. The structured property itself must be defined in DataHub before ingestion runs — this operation only assigns values, it does not create the property definition.
8097

8198
Note:
8299

83-
1. The dbt `meta_mapping` config works at the model level, while the `column_meta_mapping` config works at the column level. The `add_owner` operation is not supported at the column level.
100+
1. The dbt `meta_mapping` config works at the model level, while the `column_meta_mapping` config works at the column level. The `add_owner` operation is not supported at the column level. The `add_structured_property` operation is supported at both levels — at the column level it produces a `structuredProperties` aspect attached to each matching `schemaField` URN.
84101
2. For string meta properties we support regex matching.
85102
3. **List support**: YAML lists are now supported in meta properties. Each item in the list that matches the regex pattern will be processed.
86103

metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py

Lines changed: 116 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from dataclasses import dataclass, field
77
from datetime import datetime, timezone
88
from enum import auto
9+
from functools import cached_property
910
from typing import (
1011
Any,
1112
Dict,
@@ -1505,6 +1506,44 @@ def _upstream_exists_in_datahub(self, urn: str) -> bool:
15051506
self.report.lineage_upstreams_skipped_missing += 1
15061507
return self._upstream_exists_cache[urn]
15071508

1509+
@cached_property
1510+
def _column_meta_action_processor(self) -> OperationProcessor:
1511+
# Constructed once per run (its args are run-constant) rather than
1512+
# per-node, and shared across the schema and structured-property paths.
1513+
return OperationProcessor(
1514+
self.config.column_meta_mapping,
1515+
self.config.tag_prefix,
1516+
"SOURCE_CONTROL",
1517+
self.config.strip_user_ids_from_email,
1518+
match_nested_props=True,
1519+
)
1520+
1521+
def _extract_column_meta_aspects(self, node: DBTNode) -> Dict[str, Dict[str, Any]]:
1522+
"""Process each column's meta exactly once, returning meta_aspects keyed
1523+
by post-lowercasing field_name (matching get_schema_metadata's fieldPath)
1524+
so downstream callers don't re-run the processor on column.meta."""
1525+
if not (self.config.enable_meta_mapping and self.config.column_meta_mapping):
1526+
return {}
1527+
result: Dict[str, Dict[str, Any]] = {}
1528+
for column in node.columns:
1529+
if not column.meta:
1530+
continue
1531+
field_name = column.name
1532+
if self.config.convert_column_urns_to_lowercase:
1533+
field_name = field_name.lower()
1534+
try:
1535+
result[field_name] = self._column_meta_action_processor.process(
1536+
column.meta
1537+
)
1538+
except Exception as e:
1539+
self.report.warning(
1540+
title="Failed to process column meta_mapping",
1541+
message="Column metadata derived from meta_mapping will be missing for this column.",
1542+
context=f"{node.dbt_name}.{column.name}",
1543+
exc=e,
1544+
)
1545+
return result
1546+
15081547
def create_test_entity_mcps(
15091548
self,
15101549
test_nodes: List[DBTNode],
@@ -2452,8 +2491,15 @@ def create_dbt_platform_mces(
24522491
action_processor_tag, meta_aspects, node
24532492
) # mutates meta_aspects
24542493

2494+
# Process column.meta once and reuse across schema + structured props.
2495+
column_meta_aspects = self._extract_column_meta_aspects(node)
2496+
24552497
aspects = self._generate_base_dbt_aspects(
2456-
node, additional_custom_props_filtered, DBT_PLATFORM, meta_aspects
2498+
node,
2499+
additional_custom_props_filtered,
2500+
DBT_PLATFORM,
2501+
meta_aspects,
2502+
column_meta_aspects=column_meta_aspects,
24572503
)
24582504

24592505
# Upstream lineage.
@@ -2528,6 +2574,20 @@ def create_dbt_platform_mces(
25282574
f"Skipping emission of node {node_datahub_urn} because node_type {node.node_type} is disabled"
25292575
)
25302576

2577+
# Column structured properties must be emitted as standalone MCPs
2578+
# because they attach to schemaField URNs, not the dataset URN.
2579+
if (
2580+
self.config.enable_meta_mapping
2581+
and self.config.entities_enabled.can_emit_node_type(node.node_type)
2582+
):
2583+
yield from auto_workunit(
2584+
self._create_column_structured_property_mcps(
2585+
node,
2586+
node_datahub_urn,
2587+
column_meta_aspects=column_meta_aspects,
2588+
)
2589+
)
2590+
25312591
# Model performance.
25322592
if self.config.entities_enabled.can_emit_model_performance:
25332593
yield from auto_workunit(
@@ -2959,6 +3019,7 @@ def _generate_base_dbt_aspects(
29593019
additional_custom_props_filtered: Dict[str, str],
29603020
mce_platform: str,
29613021
meta_aspects: Dict[str, Any],
3022+
column_meta_aspects: Optional[Dict[str, Dict[str, Any]]] = None,
29623023
) -> List[Any]:
29633024
"""
29643025
Some common aspects that get generated for dbt nodes.
@@ -3009,22 +3070,34 @@ def _generate_base_dbt_aspects(
30093070
if meta_links_aspect and self.config.enable_meta_mapping:
30103071
aspects.append(meta_links_aspect)
30113072

3073+
# structuredProperties is not part of the DatasetSnapshot aspect union,
3074+
# but the create_*_platform_mces helpers automatically route any aspect
3075+
# not in the union into a standalone MCP.
3076+
meta_structured_properties_aspect = meta_aspects.get(
3077+
Constants.ADD_STRUCTURED_PROPERTY_OPERATION
3078+
)
3079+
if meta_structured_properties_aspect and self.config.enable_meta_mapping:
3080+
aspects.append(meta_structured_properties_aspect)
3081+
30123082
# add schema metadata aspect
3013-
schema_metadata = self.get_schema_metadata(self.report, node, mce_platform)
3083+
schema_metadata = self.get_schema_metadata(
3084+
self.report, node, mce_platform, column_meta_aspects=column_meta_aspects
3085+
)
30143086
aspects.append(schema_metadata)
30153087

30163088
return aspects
30173089

30183090
def get_schema_metadata(
3019-
self, report: DBTSourceReport, node: DBTNode, platform: str
3091+
self,
3092+
report: DBTSourceReport,
3093+
node: DBTNode,
3094+
platform: str,
3095+
column_meta_aspects: Optional[Dict[str, Dict[str, Any]]] = None,
30203096
) -> SchemaMetadata:
3021-
action_processor = OperationProcessor(
3022-
self.config.column_meta_mapping,
3023-
self.config.tag_prefix,
3024-
"SOURCE_CONTROL",
3025-
self.config.strip_user_ids_from_email,
3026-
match_nested_props=True,
3027-
)
3097+
# Fall back to computing inline for direct callers/tests that don't
3098+
# thread the shared dict through.
3099+
if column_meta_aspects is None:
3100+
column_meta_aspects = self._extract_column_meta_aspects(node)
30283101

30293102
canonical_schema: List[SchemaField] = []
30303103
for column in node.columns:
@@ -3041,9 +3114,11 @@ def get_schema_metadata(
30413114
elif column.description:
30423115
description = column.description
30433116

3044-
meta_aspects: Dict[str, Any] = {}
3045-
if self.config.enable_meta_mapping and column.meta:
3046-
meta_aspects = action_processor.process(column.meta)
3117+
field_name = column.name
3118+
if self.config.convert_column_urns_to_lowercase:
3119+
field_name = field_name.lower()
3120+
3121+
meta_aspects = column_meta_aspects.get(field_name, {})
30473122

30483123
if meta_aspects.get(Constants.ADD_OWNER_OPERATION):
30493124
logger.warning("The add_owner operation is not supported for columns.")
@@ -3066,10 +3141,6 @@ def get_schema_metadata(
30663141
if meta_aspects.get(Constants.ADD_TERM_OPERATION):
30673142
glossaryTerms = meta_aspects.get(Constants.ADD_TERM_OPERATION)
30683143

3069-
field_name = column.name
3070-
if self.config.convert_column_urns_to_lowercase:
3071-
field_name = field_name.lower()
3072-
30733144
field = SchemaField(
30743145
fieldPath=field_name,
30753146
nativeDataType=column.data_type,
@@ -3104,6 +3175,34 @@ def get_schema_metadata(
31043175
fields=canonical_schema,
31053176
)
31063177

3178+
def _create_column_structured_property_mcps(
3179+
self,
3180+
node: DBTNode,
3181+
dataset_urn: str,
3182+
column_meta_aspects: Optional[Dict[str, Dict[str, Any]]] = None,
3183+
) -> Iterable[MetadataChangeProposalWrapper]:
3184+
"""Emit a StructuredProperties MCP for each column with a matching
3185+
column_meta_mapping `add_structured_property` rule. The aspect attaches
3186+
to the column's schemaField URN, not the dataset URN.
3187+
3188+
Assigns values only; assumes the property definition exists (GMS's
3189+
StructuredPropertiesValidator enforces existence/type/cardinality at write
3190+
time). We don't pre-validate, as that would require a graph and break
3191+
graph-less dbt ingestion."""
3192+
if not self.config.column_meta_mapping:
3193+
return
3194+
if column_meta_aspects is None:
3195+
column_meta_aspects = self._extract_column_meta_aspects(node)
3196+
3197+
for field_name, meta_aspects in column_meta_aspects.items():
3198+
sp_aspect = meta_aspects.get(Constants.ADD_STRUCTURED_PROPERTY_OPERATION)
3199+
if not sp_aspect:
3200+
continue
3201+
yield MetadataChangeProposalWrapper(
3202+
entityUrn=mce_builder.make_schema_field_urn(dataset_urn, field_name),
3203+
aspect=sp_aspect,
3204+
)
3205+
31073206
def _aggregate_owners(
31083207
self, node: DBTNode, meta_owner_aspects: Any
31093208
) -> List[OwnerClass]:

0 commit comments

Comments
 (0)