Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ module.exports = {
"metadata-ingestion/docs/dev_guides/add_stateful_ingestion_to_source",
"metadata-ingestion/docs/dev_guides/sql_profiles",
"metadata-ingestion/docs/dev_guides/profiling_ingestions",
"metadata-ingestion/docs/dev_guides/lineage_urn_casing",
"docs/iceberg-catalog",
],
},
Expand Down
143 changes: 143 additions & 0 deletions metadata-ingestion/docs/dev_guides/lineage_urn_casing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Lineage URN Casing Normalization

DataHub identifies every entity by its URN, and URNs are compared as **exact, case-sensitive strings**.
When two sources refer to the same physical table with different casing, DataHub treats them as two
different entities, so the lineage edge between them is never drawn.

A common example: a warehouse like Snowflake reports table names in uppercase (its convention), while a
BI tool like Looker or Tableau references the same table in lowercase. The result is two disconnected
nodes instead of one connected lineage edge:

```text
Warehouse (Snowflake) entity:
urn:li:dataset:(urn:li:dataPlatform:snowflake,DB.SCHEMA.TABLE,PROD)

BI tool reports lineage pointing at:
urn:li:dataset:(urn:li:dataPlatform:snowflake,db.schema.table,PROD)

Result: ❌ no lineage edge — two orphaned nodes
```

This silently breaks multi-hop lineage too: a single broken edge in the middle of a chain hides the
entire downstream path.

### The existing mitigation, and why it isn't enough

Today the only built-in mitigation is the per-source `convert_urns_to_lowercase` flag, which keeps
lineage connected by **blindly lowercasing every URN**. It works only when enabled consistently across
_all_ sources that reference the same entities (an N-source coordination problem), and it carries real
costs:

- It only ever forces names to lowercase, and it is not available on some BI connectors (e.g. Looker,
Tableau) at all. Because you cannot control the casing those connectors emit, the only way to reconcile
a warehouse↔BI mismatch is to lowercase the warehouse side too — there is no setting that connects them
while preserving the warehouse's original (upper- or mixed-case) casing.
- That path commits you to lowercasing every identity, which loses the warehouse's real display casing
and, on case-sensitive platforms, can merge two genuinely different tables (`MyTable` and `mytable`)
into a single entity.

The **lineage URN casing normalization** feature takes a different approach. Instead of flattening all
identities, it resolves each upstream reference to the casing of the entity that **already exists** in
DataHub — per ingestion, with no global coordination, preserving the warehouse's original casing, and
only when the match is unambiguous. It is an explicit opt-in feature and is **not enabled by default**.

## How it works

When enabled, a work unit processor inspects each source's lineage aspects before they are sent to
DataHub and reconciles the casing of **upstream warehouse references** against the casing DataHub
already stores:

- For each configured upstream platform, it bulk-loads that platform's existing URNs (and schemas) from
DataHub once per run, so resolution happens locally without a round trip per reference.
- For every upstream reference, it looks for the existing entity that matches **ignoring case**:
- If an entity with the **exact** URN already exists, the reference is left unchanged (recorded as an
exact match). This ensures genuinely distinct entities on case-sensitive platforms are never merged.
- Otherwise, if exactly **one** existing entity matches case-insensitively, the reference is rewritten
to that entity's URN (recorded as a normalized match). This heals the mismatch in **both**
directions — uppercase-reported→lowercase-stored and lowercase-reported→uppercase-stored.
- If **no** entity matches, or **more than one** matches (an ambiguous collision on a case-sensitive
platform, e.g. both `orders` and `Orders` exist), the reference is left unchanged.

Only references **to** warehouse assets are modified. The entity the aspect is attached to, and
downstream field references, are never touched — the feature respects the identity and casing the
warehouse itself reported.

### What gets fixed

| Reference | Fixed |
| --------------------------------------------------------------- | ----------------------------------------- |
| `upstreamLineage` upstream dataset URNs | ✅ table-level |
| `fineGrainedLineage` upstream field URNs | ✅ table-level **and** column-name casing |
| `dashboardInfo` dataset references (`datasets`, `datasetEdges`) | ✅ table-level |

Column-level casing is corrected using the schema DataHub stores for the resolved table, so a BI tool
that reports a column as `AMOUNT` is reconciled to the warehouse's actual `amount` (or vice versa).

## Enabling the feature

Add the `normalize_lineage_urn_casing` flag under the pipeline-level `flags` block, and list the
upstream warehouse platform(s) whose references should be reconciled:

```yaml
source:
type: looker
config:
# ... your Looker config ...

flags:
normalize_lineage_urn_casing:
enabled: true
upstream_platforms:
- platform: snowflake
platform_instance: my_instance # optional
env: PROD # optional, defaults to PROD
# add more entries for additional upstream platforms
# - platform: redshift
# env: PROD

sink:
# ... your sink config ...
```

### Configuration reference

| Field | Required | Default | Description |
| ---------------------------------------- | ------------------ | ------- | ------------------------------------------------------------------------------------------------------------------------------- |
| `enabled` | yes | `false` | Whether to reconcile upstream lineage URN casing. |
| `upstream_platforms` | yes (when enabled) | `[]` | The upstream warehouse platform(s) to reconcile references against. References to platforms not listed here are left unchanged. |
| `upstream_platforms[].platform` | yes | — | The upstream data platform, e.g. `snowflake`. |
| `upstream_platforms[].platform_instance` | no | `null` | Platform instance of the upstream platform, if any. |
| `upstream_platforms[].env` | no | `PROD` | Environment (FabricType) of the upstream platform's assets. |

### Where to enable it

Enable this feature on **BI-tool and other cross-platform ingestions** that reference warehouse assets
(Looker, Tableau, Sigma, Redash, Superset, Qlik, etc.) and point it at the upstream warehouse
platform(s).

Do **not** enable it on the warehouse ingestion itself (e.g. the Snowflake ingestion) — the warehouse is
the source of truth for its own casing and identity, which must be respected.

## Match type explainability

When the feature reconciles a reference, it records a `matchType` on the lineage aspect:

- `EXACT` — the reference already matched an existing entity, including casing.
- `NORMALIZED` — the reference was rewritten to heal a casing mismatch.

When no reconciliation was performed, `matchType` is left unset. This makes it possible to distinguish
exact lineage from casing-resolved lineage downstream.

## Requirements and limitations

- **Requires a DataHub backend connection.** Resolution queries DataHub for the existing entities, so the
feature is a no-op for offline / file-only ingestion.
- **Resolves only against entities that already exist at ingestion time.** This relies on the warehouse
being ingested before the BI tool that references it — the normal order for scheduled pipelines. If a
reference's target does not yet exist in DataHub, it is left unchanged and self-heals once the
warehouse is ingested and the BI source re-runs.
- **Does not retroactively heal existing broken edges** already stored in the graph. Re-ingest the
affected source after enabling the flag to fix them.
- **Collision-safe but conservative.** On case-sensitive platforms where two genuinely different tables
differ only by case, ambiguous references are left unchanged rather than risk merging distinct
entities.
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
AutoIncrementalPropertiesProcessor,
AutoLowercaseUrnsProcessor,
AutoMaterializeReferencedTagsTermsProcessor,
AutoNormalizeLineageUrnsProcessor,
AutoPatchLastModifiedProcessor,
AutoStaleEntityRemovalProcessor,
AutoStatusAspectProcessor,
Expand Down Expand Up @@ -648,6 +649,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
ValidateDuplicateSchemaFieldPathsProcessor,
ValidateEmptySchemaFieldPathsProcessor,
AutoBrowsePathV2Processor,
AutoNormalizeLineageUrnsProcessor,
AutoIncrementalLineageProcessor,
AutoIncrementalPropertiesProcessor,
AutoIncrementalOwnershipProcessor,
Expand Down
56 changes: 56 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,46 @@ class FailureLoggingConfig(ConfigModel):
log_config: Optional[FileSinkConfig] = None


class UpstreamPlatformCasing(ConfigModel):
"""An upstream warehouse platform whose asset casing lineage references should
be reconciled against."""

platform: str = Field(
description="Upstream data platform whose assets are referenced by this "
"source's lineage (e.g. `snowflake`). References to this platform's assets "
"are reconciled against the casing stored in DataHub.",
)
platform_instance: Optional[str] = Field(
default=None,
description="Platform instance of the upstream platform, if any.",
)
env: str = Field(
default="PROD",
description="Environment (FabricType) of the upstream platform's assets.",
)


class NormalizeLineageUrnCasingConfig(ConfigModel):
"""Configuration for the lineage URN casing normalization work unit processor.

Intended to be enabled on BI-tool / cross-platform ingestions that reference
warehouse assets — NOT on the warehouse ingestion itself, whose reported casing
and identity must be respected.
"""

enabled: bool = Field(
default=False,
description="Whether to reconcile the casing of upstream warehouse URN "
"references in lineage against the casing stored in DataHub.",
)
upstream_platforms: List[UpstreamPlatformCasing] = Field(
default_factory=list,
description="The upstream warehouse platform(s) to bulk-load and reconcile "
"lineage references against. References to platforms not listed here are "
"left unchanged.",
)


class FlagsConfig(ConfigModel):
"""Experimental flags for the ingestion pipeline.

Expand Down Expand Up @@ -74,6 +114,22 @@ class FlagsConfig(ConfigModel):
),
)

normalize_lineage_urn_casing: NormalizeLineageUrnCasingConfig = Field(
default_factory=NormalizeLineageUrnCasingConfig,
description=(
"Experimental: before emitting lineage, reconcile the casing of upstream "
"warehouse URN references (table- and column-level) against the casing "
"stored in DataHub, so casing mismatches between sources (e.g. an uppercase "
"Snowflake table referenced as lowercase by a BI tool, or vice versa) don't "
"produce two disconnected lineage nodes. Unlike `convert_urns_to_lowercase`, "
"which lowercases every URN, this resolves references to the casing of the "
"entity that already exists, preserving the warehouse's original casing. "
"Requires a DataHub backend connection (no-op for offline/file-only "
"ingestion) and the upstream platform(s) to be configured. Enable on BI-tool "
"ingestions, not on the warehouse ingestion itself."
),
)

progress_report_max_failures: int = Field(
ge=0,
default_factory=get_progress_report_max_failures,
Expand Down
16 changes: 13 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -1472,12 +1472,22 @@ def get_input_fields(

input_fields = []

# Chart formulas may reference columns in a different casing than the query's
# schema (which preserves the original field-path casing). Match
# case-insensitively and emit the schema's actual field path, so the input-field
# URN resolves to a real schema field instead of a lowercased ghost.
chart_fields_by_lower = {
field_path.lower(): field_path for field_path in chart_fields
}
for field in fields:
if field.lower() not in chart_fields:
actual_field_path = chart_fields_by_lower.get(field.lower())
if actual_field_path is None:
continue
input_field = InputFieldClass(
schemaFieldUrn=builder.make_schema_field_urn(query_urn, field.lower()),
schemaField=chart_fields[field.lower()],
schemaFieldUrn=builder.make_schema_field_urn(
query_urn, actual_field_path
),
schemaField=chart_fields[actual_field_path],
)
input_fields.append(input_field)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,13 @@ def create_table_column_lineage(self, urn: str) -> List[ColumnLineageInfo]:
upstreams = [
ColumnRef(
table=urn,
column=column.name.lower(),
# Preserve the original column casing. Whether the upstream
# column ref is lowercased is governed downstream by
# convert_lineage_urns_to_lowercase in powerbi.py. Pre-lowercasing
# here defeats that flag (both branches end up lowercase) and
# forces a mismatch against warehouses that store columns in their
# original (e.g. PascalCase) casing, dropping the column-level edge.
column=column.name,
)
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from datahub.ingestion.workunit_processors.auto_materialize_referenced_tags_terms import (
AutoMaterializeReferencedTagsTermsProcessor,
)
from datahub.ingestion.workunit_processors.auto_normalize_lineage_urns import (
AutoNormalizeLineageUrnsProcessor,
)
from datahub.ingestion.workunit_processors.auto_patch_last_modified import (
AutoPatchLastModifiedProcessor,
)
Expand Down Expand Up @@ -48,6 +51,7 @@
"AutoIncrementalPropertiesProcessor",
"AutoLowercaseUrnsProcessor",
"AutoMaterializeReferencedTagsTermsProcessor",
"AutoNormalizeLineageUrnsProcessor",
"AutoPatchLastModifiedProcessor",
"AutoStaleEntityRemovalProcessor",
"AutoStatusAspectProcessor",
Expand Down
Loading
Loading