Skip to content

Airflow plugin: translate_ol_to_datahub_urn mis-translates GCS OpenLineage datasets (drops bucket, wrong platform) #17862

Description

@bmaquet

Airflow plugin: translate_ol_to_datahub_urn mis-translates GCS OpenLineage datasets (drops bucket, wrong platform)

Describe the bug

translate_ol_to_datahub_urn() in datahub_airflow_plugin/_datahub_ol_adapter.py doesn't correctly map OpenLineage GCS datasets to DataHub dataset URNs.

When operators like GCSToBigQueryOperator emit OpenLineage input datasets (per the OpenLineage naming spec):

Dataset(namespace="gs://my-bucket", name="my-key/2026/06/11")

the plugin currently produces:

urn:li:dataset:(urn:li:dataPlatform:gs,my-key/2026/06/11,DEV)

I've identified two problems with the current implementation:

  1. Bucket is dropped. The bucket lives in the OL namespace (gs://my-bucket), but translate_ol_to_datahub_urn only reads the URI scheme and ignores the authority (my-bucket).
  2. Wrong platform. gs is used as the DataHub platform instead of gcs. The Airflow integration docs map gs:// to gcs, and the GCS ingestion source uses platform gcs.

Same issue applies to other object-store namespaces (s3://{bucket}, etc.): bucket authority is dropped and the scheme is used as-is for the platform.

Environment

  • acryl-datahub-airflow-plugin[airflow2]==1.6.0
  • Apache Airflow 2.10.5
  • Plugin defaults: enable_extractors=true, disable_openlineage_plugin=true
  • Reproducible on master as of 2026-06-11

To Reproduce

  1. Install the DataHub Airflow plugin with OpenLineage support
  2. Run a DAG task using GCSToBigQueryOperator with date-partitioned source_objects:
GCSToBigQueryOperator(
    task_id="gcs_to_bq",
    bucket="my-bucket",
    source_objects=["my-key/{{ data_interval_end.format('YYYY/MM/DD') }}/*.json.gz"],
    destination_project_dataset_table="my-project.my_dataset.my_table",
    ...
)
  1. Check the input dataset URN emitted to DataHub for that task

Actual behavior

urn:li:dataset:(urn:li:dataPlatform:gs,my-key/2026/06/11,DEV)

A new dataset gets created on each run when source_objects contains date partitions.

Expected behavior

There's no documented spec for how translate_ol_to_datahub_urn should map OpenLineage object-store datasets to DataHub URNs. The Airflow docs describe URI scheme to platform mappings for native Airflow Assets, and the manual Dataset(platform=..., name=...) examples show bucket/path naming, but neither documents the auto-extraction code path in _datahub_ol_adapter.py. So any "expected" output has to be inferred from adjacent conventions.

What looks clearly wrong (and contradicts those conventions):

  • Platform: OL namespace scheme gs:// should map to DataHub platform gcs, not gs (per the docs URI table, and the GCS ingestion source uses gcs)
  • Bucket: OL puts the bucket in namespace (gs://{bucket}) and the object key in name (per the OpenLineage naming spec). The translator shouldn't drop the bucket. DataHub's object-store URNs elsewhere use {bucket}/{path} in the name field

Less clear (suggestion, not a hard requirement):

  • Partition handling: whether date segments in the object key should be stripped to produce a stable logical dataset, or passed through verbatim

My suggestion for a sensible default, given an OL dataset with namespace = gs://my-bucket and name = my-key/2026/06/11 (from a path like gs://my-bucket/my-key/yyyy/mm/dd):

urn:li:dataset:(urn:li:dataPlatform:gcs,my-bucket/my-key,DEV)

i.e. combine bucket + key, strip trailing date partitions. That would let automatic extractor lineage stitch to assets from GCS/BigQuery ingestion without creating a new dataset per run. Partition stripping is opinionated though: it might not be right for all pipelines, and probably needs to be configurable or scoped to known patterns (YYYY/MM/DD, hive-style year=/month=/day=, etc.) rather than assumed as universal behavior.

At minimum, fixing the platform mapping and bucket inclusion would already be a real improvement over the current output.

Root cause

In _datahub_ol_adapter.py:

def translate_ol_to_datahub_urn(ol_uri, env=...):
    namespace = ol_uri.namespace
    name = _sanitize_ol_dataset_name(ol_uri.name)

    scheme, *rest = namespace.split("://", maxsplit=1)
    platform = OL_SCHEME_TWEAKS.get(scheme, scheme)  # "gs", no gs→gcs mapping
    return builder.make_dataset_urn(platform=platform, name=name, env=env)
  • rest (the bucket) is never read
  • OL_SCHEME_TWEAKS maps sqlserver→mssql and awsathena→athena, but not gs→gcs
  • No partition normalization is applied to object-store paths

GCSToBigQueryOperator.get_openlineage_facets_on_complete() is defined in Airflow (not DataHub) and follows the OpenLineage spec correctly. The bug is in DataHub's translation layer.

Impact

  • Orphan datasets on platform gs that don't match gcs ingestion
  • Lineage graph fragmentation: edges point at datasets that don't exist in the catalog under the expected URNs
  • One new dataset entity per partition date for daily pipelines
  • Same translation path affects any operator emitting namespace=f"gs://{bucket}" / namespace=f"s3://{bucket}" (e.g. GCSToGCSOperator, S3ToGCSOperator)

Related precedent: #10558 (wrong platform from OL_SCHEME_TWEAKS, trino→presto, fixed in #11925). The .None. name sanitizer in the same file (#17191) shows DataHub already treats OL→URN translation mismatches as adapter bugs.

Suggested fixes

Option A: Fix translate_ol_to_datahub_urn for object-store namespaces, gated behind opt-in config (recommended)

When namespace matches gs://{bucket} or s3://{bucket}:

  • platform = gcs / s3
  • name = {bucket}/{object_key} (combine namespace authority with OL name)

This is a breaking change for anyone already running with the current output, so it probably needs a config gate, e.g. [datahub] normalize_object_store_urns = false (default false), flipped to true in a future release with orphan cleanup guidance (similar to the .None. orphan docs).

Optionally (separate decision): strip date partition segments from {object_key} to avoid per-run dataset churn, either by default with known patterns or behind its own flag.

Option B: Extend OL_SCHEME_TWEAKS only

Add "gs": "gcs". Fixes the platform but not the missing bucket, so not enough on its own.

Current workaround

Subclass the operator and return empty OpenLineage facets to suppress extractor output:

class GCSToBigQueryOperatorNoAutoLineage(GCSToBigQueryOperator):
    def get_openlineage_facets_on_complete(self, task_instance):
        from airflow.providers.openlineage.extractors import OperatorLineage
        return OperatorLineage(inputs=[], outputs=[])

Loses automatic extraction entirely. Not great long-term.

Additional context

  • No existing issue found for this in datahub-project/datahub
  • I'd be happy to create a PR when we align on a recommended approach

Metadata

Metadata

Assignees

Labels

bugBug report

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions