Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

### Breaking Changes

- #16396: Oracle connector: When connecting via `service_name` to a multitenant Oracle database, the database name used in URNs will now reflect the Pluggable Database (PDB) name instead of the Container Database (CDB) name. In Oracle Multitenant architecture, a CDB is the top-level container (e.g. `cdb`) and a PDB is an individual tenant database within it (e.g. `mypdb`); `service_name` typically routes to the PDB, so the PDB name is the correct identifier for your datasets. This affects both dataset URNs (when `add_database_name_to_urn: true`) and database/schema container URNs (always, since containers always include the database name). If your existing metadata was ingested with the old CDB-based URNs, re-ingesting will create new entities under the corrected URNs. To preserve the old URN shape and avoid re-creating entities, set `urn_db_name` explicitly in your recipe to match your previous CDB name.
- **Retention service disabled: only current version retained.** When the retention service is not enabled (not configured or unavailable), the write path now retains only the current version (version 0) and does not create version-history rows. Previously, version history was still written when retention was disabled. **Impact:** Deployments that run without retention enabled will no longer accumulate aspect version history; only the latest aspect value is stored. **Migration:** Enable and configure the retention service (e.g. ingest retention policies from `boot/retention.yaml`) if you need version history for any entity/aspect.
- #16134: Java 17 Runtime Required - DataHub now compiles to Java 17 bytecode (previously Java 11). All Docker images already ship with Java 17 runtime. Self-hosted users must ensure their runtime environment uses Java 17+. The Maven artifact name remains `datahub-client-java8` for backward compatibility, but now requires Java 17+ at runtime. This change also includes:
- Spark integration upgraded from 3.0.3 to 3.3.4 (minimum version required for Java 17 support)
Expand Down
211 changes: 127 additions & 84 deletions metadata-ingestion/src/datahub/ingestion/source/sql/oracle.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
from datahub.ingestion.source.sql.sql_config import (
BasicSQLAlchemyConfig,
)
from datahub.ingestion.source.sql.sql_report import SQLSourceReport
from datahub.ingestion.source.sql.sql_utils import (
gen_database_key,
gen_schema_key,
Expand All @@ -73,8 +74,11 @@
get_procedure_flow_name,
)
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig

# Oracle uses SQL aggregator for usage and lineage like SQL Server
from datahub.ingestion.source_report.ingestion_stage import (
LINEAGE_EXTRACTION,
METADATA_EXTRACTION,
QUERIES_EXTRACTION,
)
from datahub.metadata.schema_classes import (
SubTypesClass,
ViewPropertiesClass,
Expand Down Expand Up @@ -307,8 +311,8 @@ class ProcedureDependencies(BaseModel):
COALESCE(t.NUM_ROWS * t.AVG_ROW_LEN, 0) / (1024 * 1024 * 1024) AS SIZE_GB
FROM {tables_table_name} t
WHERE t.OWNER = :owner
AND (t.NUM_ROWS < :table_row_limit OR t.NUM_ROWS IS NULL)
AND COALESCE(t.NUM_ROWS * t.AVG_ROW_LEN, 0) / (1024 * 1024 * 1024) < :table_size_limit
AND (:table_row_limit IS NULL OR t.NUM_ROWS IS NULL OR t.NUM_ROWS < :table_row_limit)
AND (:table_size_limit IS NULL OR COALESCE(t.NUM_ROWS * t.AVG_ROW_LEN, 0) / (1024 * 1024 * 1024) < :table_size_limit)
"""

VSQL_PREREQUISITES_QUERY = "SELECT COUNT(*) FROM V$SQL WHERE ROWNUM = 1"
Expand Down Expand Up @@ -349,8 +353,28 @@ class ProcedureDependencies(BaseModel):
"""
)


def normalize_db_name(name: str) -> str:
"""Replicate Oracle's normalize_name: ALL_UPPERCASE identifiers are lowercased.

Oracle stores unquoted identifiers in uppercase; SQLAlchemy's normalize_name
converts them to lowercase for consistency. We apply the same rule wherever
we use urn_db_name without access to the dialect (e.g. in OracleConfig methods).
"""
return name.lower() if name.isupper() else name


DB_NAME_QUERY = """

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multitenant Oracle users with service_name will get new URNs (PDB name instead of CDB). Add entry to docs/how/updating-datahub.md with migration path and mention the urn_db_name workaround.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an entry to docs/how/updating-datahub.md explaining the CDB/PDB distinction, why service_name connections now correctly use the PDB name, and how to use urn_db_name to pin the old value if needed to avoid re-creating existing entities.

SELECT sys_context('USERENV','DB_NAME') FROM dual
SELECT
CASE
WHEN sys_context('USERENV', 'CON_NAME') NOT IN (
'CDB$ROOT',
sys_context('USERENV', 'DB_NAME')
)
THEN sys_context('USERENV', 'CON_NAME')
ELSE sys_context('USERENV', 'DB_NAME')
END
FROM dual
"""


Expand Down Expand Up @@ -409,6 +433,19 @@ class OracleConfig(BasicSQLAlchemyConfig, BaseUsageConfig):
default=None,
description="If using, omit `service_name`.",
)
urn_db_name: Optional[str] = Field(

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify when urn_db_name should be used. Currently if both database and urn_db_name are set, entity and lineage URNs diverge.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the field description to make clear it only applies when service_name is used (i.e. database is not set), and added an explicit warning: "Do not set this alongside database; only one should be used."

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you use model validator?
If both database and urn_db_name are set, this PR takes urn_db_name and database will be silently ignored. Instead, it's good to raise error so that a customer recognizes which value would be used when both values are set.

Example:

@model_validator(mode="after")
def validate_connections(self) -> "SnowplowSourceConfig":
"""Validate that at least one connection type is configured."""
if self.bdp_connection is None and self.iglu_connection is None:
raise ValueError(
"Either bdp_connection or iglu_connection must be configured. "
"BDP connection is required for managed Snowplow deployments. "
"Iglu connection is required for open-source deployments."
)
# Iglu-only mode: automatic discovery via /api/schemas endpoint
if self.bdp_connection is None and self.iglu_connection is not None:
logging.getLogger(__name__).info(
"Iglu-only mode: will use automatic schema discovery via /api/schemas endpoint. "
"Requires Iglu Server 0.6+ with list schemas support."
)
return self

default=None,
description=(
"Override the database name used in URN construction. "
"Only relevant when add_database_name_to_urn is true and service_name is used "
"(i.e. database is not set). "
"The connector auto-detects the name by querying sys_context('USERENV','CON_NAME') "
"(the PDB name in multitenant setups) with a fallback to DB_NAME. "
"Set this explicitly if auto-detection returns the wrong value — for example "
"if your service_name does not route directly to the target PDB. "
"Do not set this alongside database; only one should be used."
),
)
add_database_name_to_urn: Optional[bool] = Field(
default=False,
description=(
Expand Down Expand Up @@ -464,6 +501,13 @@ class OracleConfig(BasicSQLAlchemyConfig, BaseUsageConfig):
description="Generate operation statistics from audit trail data (CREATE, INSERT, UPDATE, DELETE operations).",
)

lazy_schema_resolver: bool = Field(
default=False,
description="If enabled, skips the upfront bulk fetch of all known schemas from DataHub "
"when resolving lineage. Useful on large DataHub instances where the bulk fetch "
"causes memory or performance issues.",
)

# Query extraction configuration for usage statistics
include_query_usage: bool = Field(
default=False,
Expand Down Expand Up @@ -534,6 +578,15 @@ def check_data_dictionary_mode(cls, value):
raise ValueError("Specify one of data dictionary views mode: 'ALL', 'DBA'.")
return value

@model_validator(mode="after")
def check_database_and_urn_db_name_mutually_exclusive(self):
if self.database and self.urn_db_name:
raise ValueError(
"Only one of 'database' or 'urn_db_name' may be set. "
"'urn_db_name' is only for service_name connections where 'database' is not set."
)
return self

@model_validator(mode="after")
def check_thick_mode_lib_dir(self):
if (
Expand All @@ -560,20 +613,24 @@ def get_sql_alchemy_url(
def get_identifier(self, schema: str, table: str) -> str:
regular = f"{schema}.{table}"
if self.add_database_name_to_urn:
if self.database:
return f"{self.database}.{regular}"
return regular
else:
return regular
db = self.database
if not db and self.urn_db_name:
# get_db_name normalises via the dialect; replicate that here so
# entity URNs and lineage URNs share the same db casing.
db = normalize_db_name(self.urn_db_name)
if db:
return f"{db}.{regular}"
return regular


class OracleInspectorObjectWrapper:
"""
Inspector class wrapper, which queries DBA_TABLES instead of ALL_TABLES
"""

def __init__(self, inspector_instance: Inspector):
def __init__(self, inspector_instance: Inspector, report: SQLSourceReport):
self._inspector_instance = inspector_instance
self.report = report
self.log = logging.getLogger(__name__)
# tables that we don't want to ingest into the DataHub
self.exclude_tablespaces: Tuple[str, str] = ("SYSTEM", "SYSAUX")
Expand Down Expand Up @@ -973,7 +1030,7 @@ def get_pk_constraint(
title="Failed to Process Primary Keys",
message=(
f"Unable to process primary key constraints for {schema}.{table_name}. "
"Ensure SELECT access on DBA_CONSTRAINTS and DBA_CONS_COLUMNS.",
"Ensure SELECT access on DBA_CONSTRAINTS and DBA_CONS_COLUMNS."
),
context=f"{schema}.{table_name}",
exc=e,
Expand Down Expand Up @@ -1247,23 +1304,24 @@ def __init__(self, config, ctx):
# linux requires configurating the library path with ldconfig or LD_LIBRARY_PATH
oracledb.init_oracle_client()

# Override SQL aggregator to enable usage and operations like BigQuery/Snowflake/Teradata
if self.config.include_usage_stats or self.config.include_operational_stats:
self.aggregator = SqlParsingAggregator(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
graph=self.ctx.graph,
generate_lineage=self.include_lineage,
generate_usage_statistics=self.config.include_usage_stats,
generate_operations=self.config.include_operational_stats,
usage_config=self.config if self.config.include_usage_stats else None,
eager_graph_load=False,
)
self.report.sql_aggregator = self.aggregator.report

# Oracle inherits standard workunit generation from SQLAlchemySource
# Usage and lineage are handled automatically by the SQL aggregator
# Pre-fetch schemas from DataHub when not ingesting all tables/views so that
# V$SQL queries and view definitions can resolve lineage against tables outside
# the current run. lazy_schema_resolver lets large instances opt out.
self.aggregator = SqlParsingAggregator(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
graph=self.ctx.graph,
generate_lineage=self.include_lineage,
generate_usage_statistics=self.config.include_usage_stats,
generate_operations=self.config.include_operational_stats,
usage_config=self.config if self.config.include_usage_stats else None,
eager_graph_load=(
not (self.config.include_tables and self.config.include_views)
and not self.config.lazy_schema_resolver
),
)
self.report.sql_aggregator = self.aggregator.report

@classmethod
def create(cls, config_dict, ctx):
Expand All @@ -1285,16 +1343,15 @@ def test_connection(cls, config_dict: dict) -> TestConnectionReport:

def get_db_name(self, inspector: Inspector) -> str:
"""
This overwrites the default implementation, which only tries to read
database name from Connection URL, which does not work when using
service instead of database.
In that case, it tries to retrieve the database name by sending a query to the DB.

Note: This is used as a fallback if database is not specified in the config.
Returns a normalized (lowercased) database name for consistency with schema/table names.
Overrides the default implementation to support service_name connections,
where the database name is not in the connection URL and must be queried
from Oracle directly.
"""

# call default implementation first
if self.config.urn_db_name:
normalized = inspector.dialect.normalize_name(self.config.urn_db_name)
return normalized or self.config.urn_db_name

db_name = super().get_db_name(inspector)

if db_name == "":
Expand Down Expand Up @@ -1338,48 +1395,43 @@ def get_inspectors(self) -> Iterable[Inspector]:
# SQLAlchemy inspector uses ALL_* tables; OracleInspectorObjectWrapper uses DBA_* tables
if self.config.data_dictionary_mode != DataDictionaryMode.ALL:
# OracleInspectorObjectWrapper uses __getattr__ to proxy to Inspector
yield cast(Inspector, OracleInspectorObjectWrapper(inspector))
yield cast(
Inspector, OracleInspectorObjectWrapper(inspector, self.report)
)
else:
yield inspector

def get_db_schema(self, dataset_identifier: str) -> Tuple[Optional[str], str]:
"""
Override the get_db_schema method to ensure proper schema name extraction.
This method is used during view lineage extraction to determine the default schema
for unqualified table names in view definitions.
"""
try:
# Try to get the schema from the dataset identifier
# dataset_identifier is either "db.schema.table" or "schema.table"
# depending on add_database_name_to_urn. parts[-3], parts[-2], parts[-1]
# are db, schema, table respectively.
parts = dataset_identifier.split(".")

# Handle the identifier format differently based on add_database_name_to_urn flag
if self.config.add_database_name_to_urn:
if len(parts) >= 3:
# Format is: database.schema.view when add_database_name_to_urn=True
db_name = parts[-3]
schema_name = parts[-2]
return db_name, schema_name
return parts[-3], parts[-2] # db, schema
elif len(parts) >= 2:
# Handle the case where database might be missing even with flag enabled
# If we have a database in the config, use that
db_name = str(self.config.database)
schema_name = parts[-2]
return db_name, schema_name
# Identifier is missing the db component — fall back to config.
# Using str(None) here would produce "None.schema.table" URNs.
# Normalise urn_db_name the same way as get_identifier so that
# view-lineage default_db matches entity URN db components.
urn_db = (
normalize_db_name(self.config.urn_db_name)
if self.config.urn_db_name
else None
)
db_name = self.config.database or urn_db or None
return db_name, parts[-2] # db (or None), schema
else:
# Format is: schema.view when add_database_name_to_urn=False
if len(parts) >= 2:
# When add_database_name_to_urn is False, don't include database in the result
db_name = None
schema_name = parts[-2]
return db_name, schema_name
return None, parts[-2] # schema only; no db in URNs
except Exception as e:
logger.warning(
f"Error extracting schema from identifier {dataset_identifier}: {e}"
)

# Fall back to parent implementation if our approach fails
db_name, schema_name = super().get_db_schema(dataset_identifier)
return db_name, schema_name
# Reached only on a malformed identifier (e.g. no dots) or an exception above.
return super().get_db_schema(dataset_identifier)

@property
def include_lineage(self) -> bool:
Expand Down Expand Up @@ -1513,9 +1565,6 @@ def _get_procedure_default_db(self) -> Optional[str]:
def get_procedures_for_schema(
self, inspector: Inspector, schema: str, db_name: str
) -> List[BaseProcedure]:
"""
Get stored procedures, functions, and packages for a specific schema.
"""
base_procedures = []
tables_prefix = self.config.data_dictionary_mode.value

Expand Down Expand Up @@ -2069,7 +2118,7 @@ def _populate_aggregator_from_queries(self) -> None:

logger.info(f"V$SQL prerequisites check: {check_result.message}")

with self.report.new_stage("Query usage extraction from V$SQL"):
with self.report.new_stage(QUERIES_EXTRACTION):
logger.info(
f"Starting query extraction from V$SQL (max_queries={self.config.max_queries_to_extract})"
)
Expand All @@ -2093,31 +2142,25 @@ def _populate_aggregator_from_queries(self) -> None:
engine.dispose()

def _generate_aggregator_workunits(self) -> Iterable[MetadataWorkUnit]:
"""Override to prevent parent class from generating aggregator work units during schema extraction.

We handle aggregator generation manually after populating it with V$SQL query data.
"""
# Do nothing - we'll call the parent implementation manually after populating the aggregator
# Deferred: called explicitly after V$SQL population in get_workunits_internal.
return iter([])

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
"""Override to add query extraction for usage statistics."""
logger.info("Starting Oracle metadata extraction")

# Step 1: Schema extraction first (parent class will skip aggregator generation due to our override)
with self.report.new_stage("Schema metadata extraction"):
# Schema first: registers table/view schemas into the resolver so that
# V$SQL queries and view definitions can resolve column-level lineage.
with self.report.new_stage(METADATA_EXTRACTION):
yield from super().get_workunits_internal()
logger.info("Completed schema metadata extraction")
logger.info("Schema metadata extraction complete")

# Step 2: Query extraction after schema extraction
# This allows lineage processing to have access to all discovered schema information
# V$SQL second: observed queries are added to the aggregator after the
# schema resolver is populated. _generate_aggregator_workunits is a no-op
# in the parent call above (overridden below) so lineage is not emitted yet.
# _populate_aggregator_from_queries opens its own QUERIES_EXTRACTION stage.
self._populate_aggregator_from_queries()

# Step 3: Generate aggregator workunits after populating with V$SQL queries
with self.report.new_stage("Lineage and usage processing"):
# Call parent implementation directly to generate aggregator work units
with self.report.new_stage(LINEAGE_EXTRACTION):
yield from super()._generate_aggregator_workunits()
logger.info("Completed lineage and usage processing")
logger.info("Lineage and usage processing complete")

def get_workunits(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def get_sample_data_for_table(
self.connection, compile_kwargs={"literal_binds": True}
)
)
query += "\nAND ROWNUM <= %d" % sample_size
query += "\nWHERE ROWNUM <= %d" % sample_size
else:
query = sa.select([sa.text("*")]).select_from(table).limit(sample_size)
query_results = self.connection.execute(query)
Expand Down
Loading
Loading