Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ Requirements:

### Other Notable Changes

- **(Ingestion / datahub-gc)** The soft-deleted-entity cleanup now hard-deletes eligible entities in **bulk batches** instead of one request per entity. Eligible URNs are buffered and removed via a single `DELETE /openapi/entities/v1/?urns=...&soft=false` round-trip per `delete_batch_size` (default `1000`), which sharply reduces the request volume and CPU pressure of large GC runs. As part of this change, the per-entity `delete_references_to_urn` call is **no longer issued** during GC hard-deletion — dangling references from other entities to a deleted URN are not actively scrubbed by the GC job. The per-entity retention check (`status.removed` + age vs `retention_days`) is unchanged. **Action:** tune `delete_batch_size` if your GMS rejects very large URN lists; if you relied on GC to clean up inbound references, run a separate reference-cleanup pass.

- #17376: **(Ingestion / Hex)** Major in-place upgrade of the `hex` connector: upstream lineage (table-level and column-level), Project → Component links, run history (`lastRefreshed`), and optional AI context documents are now extracted directly from Hex REST APIs — no external CLI, warehouse-side ingestion dependency, or query-tag scraping required. See the [Hex connector docs](https://docs.datahub.com/docs/generated/ingestion/sources/hex) for the new `include_lineage`, `use_queried_tables_lineage`, `connection_platform_map`, and `include_context_documents` options.

- **(Ingestion / dbt)** dbt test assertion entities now emit an `ownership` aspect when the dbt test node has explicit owner metadata (`meta.owner` / `config.meta.owner`).
Expand Down
18 changes: 18 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,24 @@ def hard_delete_entity(
timeseries_rows_affected: int = summary.get("timeseriesRows", 0)
return rows_affected, timeseries_rows_affected

def hard_delete_entities(self, urns: Sequence[str]) -> None:
"""Hard delete a batch of entities in a single request.

Uses the OpenAPI v1 batch endpoint
(``DELETE /openapi/entities/v1/?urns=...&soft=false``) so a chunk of URNs
is removed in one round-trip instead of one request per entity. Intended
for bulk maintenance (e.g. GC of soft-deleted entities); chunk the URN
list on the caller side to keep each request bounded.
"""
if not urns:
return
params: List[Tuple[str, str]] = [("urns", urn) for urn in urns]
params.append(("soft", "false"))
response = self._session.delete(
f"{self._gms_server}/openapi/entities/v1/", params=params
)
response.raise_for_status()

def delete_entity(self, urn: str, hard: bool = False) -> None:
"""Delete an entity by urn.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel):
description="The number of entities to get in a batch from GraphQL",
)

delete_batch_size: int = Field(
1000,
description=(
"The number of entities to hard delete in a single batch request. "
"Eligible entities are buffered and removed via one bulk delete "
"round-trip per batch instead of one request per entity."
),
)

delay: Optional[float] = Field(
0.25,
description="Delay between each batch",
Expand Down Expand Up @@ -150,6 +159,10 @@ def __init__(
self.start_time = time.time()
self._report_lock: Lock = Lock()
self.last_print_time = 0.0
# Eligible urns are buffered and removed in bulk batches. The buffer is
# mutated from worker threads, so all access is guarded by this lock.
self._delete_buffer: List[Urn] = []
self._delete_buffer_lock: Lock = Lock()

def _increment_retained_count(self) -> None:
"""Thread-safe method to update report fields"""
Expand Down Expand Up @@ -193,12 +206,31 @@ def delete_entity(self, urn: Urn) -> None:
if self._deletion_limit_reached() or self._times_up():
return
self._increment_removal_started_count()
self.ctx.graph.delete_entity(urn=urn.urn(), hard=True)
self.ctx.graph.delete_references_to_urn(
urn=urn.urn(),
dry_run=False,
)
self._update_report(urn.urn(), urn.entity_type)
# Buffer the urn and only issue a network call once a full batch has
# accumulated. Swap the buffer out under the lock and flush the local
# copy afterwards so the bulk delete round-trip never holds the lock.
batch_to_flush: Optional[List[Urn]] = None
with self._delete_buffer_lock:
self._delete_buffer.append(urn)
if len(self._delete_buffer) >= self.config.delete_batch_size:
batch_to_flush = self._delete_buffer
self._delete_buffer = []
if batch_to_flush:
self._hard_delete_batch(batch_to_flush)

def _hard_delete_batch(self, batch: List[Urn]) -> None:
assert self.ctx.graph
if not batch:
return
self.ctx.graph.hard_delete_entities([urn.urn() for urn in batch])
for urn in batch:
self._update_report(urn.urn(), urn.entity_type)

def _flush_pending_deletes(self) -> None:
with self._delete_buffer_lock:
batch_to_flush = self._delete_buffer
self._delete_buffer = []
self._hard_delete_batch(batch_to_flush)

def delete_soft_deleted_entity(self, urn: Urn) -> None:
assert self.ctx.graph
Expand Down Expand Up @@ -348,3 +380,6 @@ def cleanup_soft_deleted_entities(self) -> None:
while len(futures) > 0:
self._print_report()
futures = self._process_futures(futures)

# Flush any urns left in the buffer below the batch threshold.
self._flush_pending_deletes()
71 changes: 56 additions & 15 deletions metadata-ingestion/tests/unit/test_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,23 +252,61 @@ def test_delete_entity_dry_run(self):
self.assertEqual(self.report.num_hard_deleted, 0)

def test_delete_entity(self):
"""Test that delete_entity properly deletes and updates reports."""
# Call the method
"""delete_entity buffers the urn; flushing hard-deletes it in a batch
without per-entity delete_entity / delete_references round-trips."""
# Call the method: buffers, does not delete yet
self.cleanup.delete_entity(self.sample_urn)
self.mock_graph.hard_delete_entities.assert_not_called()

# Verify deletion happened
self.mock_graph.delete_entity.assert_called_once_with(
urn=self.sample_urn.urn(), hard=True
)
self.mock_graph.delete_references_to_urn.assert_called_once_with(
urn=self.sample_urn.urn(),
dry_run=False,
# Flushing the buffer issues the batch hard delete
self.cleanup._flush_pending_deletes()
self.mock_graph.hard_delete_entities.assert_called_once_with(
[self.sample_urn.urn()]
)

# Report update
self.assertEqual(self.report.num_hard_deleted, 1)
self.assertEqual(self.report.num_hard_deleted_by_type.get("dataset"), 1)
self.assertEqual(self.report.num_soft_deleted_entity_removal_started, 1)
# Legacy per-entity paths are not used
self.mock_graph.delete_entity.assert_not_called()
self.mock_graph.delete_references_to_urn.assert_not_called()

def test_cleanup_batch_deletes_eligible_and_skips_references(self):
"""Eligible (soft-deleted past retention) urns are hard-deleted in a batch
via hard_delete_entities; recent ones are retained; per-entity
delete_entity / delete_references are not used."""
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
old_ms = now_ms - 20 * 24 * 60 * 60 * 1000 # 20d > retention_days=10
recent_ms = now_ms - 1 * 24 * 60 * 60 * 1000 # 1d < retention

urns = [
"urn:li:dataset:(urn:li:dataPlatform:example,old_one,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:example,old_two,PROD)",
"urn:li:dataset:(urn:li:dataPlatform:example,recent_one,PROD)",
]
self.mock_graph.get_urns_by_filter.return_value = urns

def raw(entity_urn, aspects):
t = recent_ms if "recent" in entity_urn else old_ms
return {
"aspects": {
"status": {"value": {"removed": True}, "created": {"time": t}}
}
}

self.mock_graph.get_entity_raw.side_effect = raw

self.cleanup.cleanup_soft_deleted_entities()

deleted: set = set()
for c in self.mock_graph.hard_delete_entities.call_args_list:
deleted.update(c.args[0])
assert deleted == {urns[0], urns[1]} # recent_one retained
self.mock_graph.delete_references_to_urn.assert_not_called()
self.mock_graph.delete_entity.assert_not_called()

# Report update: both old urns hard-deleted, recent one retained
self.assertEqual(self.report.num_hard_deleted, 2)
self.assertEqual(self.report.num_hard_deleted_by_type.get("dataset"), 2)
self.assertEqual(self.report.num_soft_deleted_entity_removal_started, 2)
self.assertEqual(self.report.num_soft_deleted_retained_due_to_age, 1)

def test_delete_entity_respects_deletion_limit(self):
"""Test that delete_entity respects the deletion limit."""
Expand Down Expand Up @@ -321,8 +359,11 @@ def test_delete_soft_deleted_entity_old_enough(self):
# Call the method
self.cleanup.delete_soft_deleted_entity(self.sample_urn)

# Verify deletion was attempted
self.mock_graph.delete_entity.assert_called_once()
# Verify deletion was attempted: urn buffered for batch hard delete
self.cleanup._flush_pending_deletes()
self.mock_graph.hard_delete_entities.assert_called_once_with(
[self.sample_urn.urn()]
)
self.assertEqual(self.report.num_soft_deleted_retained_due_to_age, 0)
self.assertIsNone(
self.report.num_soft_deleted_retained_due_to_age_by_type.get("dataset")
Expand Down
Loading