From 1337adb7a19f2ce199ab5f485345eb59908bf214 Mon Sep 17 00:00:00 2001 From: treff7es Date: Tue, 9 Jun 2026 10:43:10 +0200 Subject: [PATCH] perf(ingest/gc): batch hard-delete soft-deleted entities The soft-deleted-entity cleanup in datahub-gc previously issued three GMS round-trips per entity (status fetch, hard delete, reference cleanup), making large GC runs slow and CPU-intensive. Buffer eligible URNs and remove them in bulk via a single DELETE /openapi/entities/v1/?urns=...&soft=false request per delete_batch_size (default 1000), exposed as a new DataHubGraph.hard_delete_entities() helper. The per-entity retention check is unchanged; the per-entity delete_references_to_urn call is dropped. The buffer is swapped out under a lock and flushed outside it so the bulk request never blocks worker threads. --- docs/how/updating-datahub.md | 2 + .../src/datahub/ingestion/graph/client.py | 18 +++++ .../source/gc/soft_deleted_entity_cleanup.py | 47 ++++++++++-- metadata-ingestion/tests/unit/test_gc.py | 71 +++++++++++++++---- 4 files changed, 117 insertions(+), 21 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 7fff547cd99f..c4a1273ccf2d 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -68,6 +68,8 @@ Requirements: ### Other Notable Changes +- **(Ingestion / datahub-gc)** The soft-deleted-entity cleanup now hard-deletes eligible entities in **bulk batches** instead of one request per entity. Eligible URNs are buffered and removed via a single `DELETE /openapi/entities/v1/?urns=...&soft=false` round-trip per `delete_batch_size` (default `1000`), which sharply reduces the request volume and CPU pressure of large GC runs. As part of this change, the per-entity `delete_references_to_urn` call is **no longer issued** during GC hard-deletion — dangling references from other entities to a deleted URN are not actively scrubbed by the GC job. The per-entity retention check (`status.removed` + age vs `retention_days`) is unchanged. **Action:** tune `delete_batch_size` if your GMS rejects very large URN lists; if you relied on GC to clean up inbound references, run a separate reference-cleanup pass. + - #17376: **(Ingestion / Hex)** Major in-place upgrade of the `hex` connector: upstream lineage (table-level and column-level), Project → Component links, run history (`lastRefreshed`), and optional AI context documents are now extracted directly from Hex REST APIs — no external CLI, warehouse-side ingestion dependency, or query-tag scraping required. See the [Hex connector docs](https://docs.datahub.com/docs/generated/ingestion/sources/hex) for the new `include_lineage`, `use_queried_tables_lineage`, `connection_platform_map`, and `include_context_documents` options. - **(Ingestion / dbt)** dbt test assertion entities now emit an `ownership` aspect when the dbt test node has explicit owner metadata (`meta.owner` / `config.meta.owner`). diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index b4c47996d10f..688f603bb583 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -1411,6 +1411,24 @@ def hard_delete_entity( timeseries_rows_affected: int = summary.get("timeseriesRows", 0) return rows_affected, timeseries_rows_affected + def hard_delete_entities(self, urns: Sequence[str]) -> None: + """Hard delete a batch of entities in a single request. + + Uses the OpenAPI v1 batch endpoint + (``DELETE /openapi/entities/v1/?urns=...&soft=false``) so a chunk of URNs + is removed in one round-trip instead of one request per entity. Intended + for bulk maintenance (e.g. GC of soft-deleted entities); chunk the URN + list on the caller side to keep each request bounded. + """ + if not urns: + return + params: List[Tuple[str, str]] = [("urns", urn) for urn in urns] + params.append(("soft", "false")) + response = self._session.delete( + f"{self._gms_server}/openapi/entities/v1/", params=params + ) + response.raise_for_status() + def delete_entity(self, urn: str, hard: bool = False) -> None: """Delete an entity by urn. diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index 12f12d74adf8..39b3463f8ad4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -36,6 +36,15 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel): description="The number of entities to get in a batch from GraphQL", ) + delete_batch_size: int = Field( + 1000, + description=( + "The number of entities to hard delete in a single batch request. " + "Eligible entities are buffered and removed via one bulk delete " + "round-trip per batch instead of one request per entity." + ), + ) + delay: Optional[float] = Field( 0.25, description="Delay between each batch", @@ -150,6 +159,10 @@ def __init__( self.start_time = time.time() self._report_lock: Lock = Lock() self.last_print_time = 0.0 + # Eligible urns are buffered and removed in bulk batches. The buffer is + # mutated from worker threads, so all access is guarded by this lock. + self._delete_buffer: List[Urn] = [] + self._delete_buffer_lock: Lock = Lock() def _increment_retained_count(self) -> None: """Thread-safe method to update report fields""" @@ -193,12 +206,31 @@ def delete_entity(self, urn: Urn) -> None: if self._deletion_limit_reached() or self._times_up(): return self._increment_removal_started_count() - self.ctx.graph.delete_entity(urn=urn.urn(), hard=True) - self.ctx.graph.delete_references_to_urn( - urn=urn.urn(), - dry_run=False, - ) - self._update_report(urn.urn(), urn.entity_type) + # Buffer the urn and only issue a network call once a full batch has + # accumulated. Swap the buffer out under the lock and flush the local + # copy afterwards so the bulk delete round-trip never holds the lock. + batch_to_flush: Optional[List[Urn]] = None + with self._delete_buffer_lock: + self._delete_buffer.append(urn) + if len(self._delete_buffer) >= self.config.delete_batch_size: + batch_to_flush = self._delete_buffer + self._delete_buffer = [] + if batch_to_flush: + self._hard_delete_batch(batch_to_flush) + + def _hard_delete_batch(self, batch: List[Urn]) -> None: + assert self.ctx.graph + if not batch: + return + self.ctx.graph.hard_delete_entities([urn.urn() for urn in batch]) + for urn in batch: + self._update_report(urn.urn(), urn.entity_type) + + def _flush_pending_deletes(self) -> None: + with self._delete_buffer_lock: + batch_to_flush = self._delete_buffer + self._delete_buffer = [] + self._hard_delete_batch(batch_to_flush) def delete_soft_deleted_entity(self, urn: Urn) -> None: assert self.ctx.graph @@ -348,3 +380,6 @@ def cleanup_soft_deleted_entities(self) -> None: while len(futures) > 0: self._print_report() futures = self._process_futures(futures) + + # Flush any urns left in the buffer below the batch threshold. + self._flush_pending_deletes() diff --git a/metadata-ingestion/tests/unit/test_gc.py b/metadata-ingestion/tests/unit/test_gc.py index 770fd8f422c4..504a163377c3 100644 --- a/metadata-ingestion/tests/unit/test_gc.py +++ b/metadata-ingestion/tests/unit/test_gc.py @@ -252,23 +252,61 @@ def test_delete_entity_dry_run(self): self.assertEqual(self.report.num_hard_deleted, 0) def test_delete_entity(self): - """Test that delete_entity properly deletes and updates reports.""" - # Call the method + """delete_entity buffers the urn; flushing hard-deletes it in a batch + without per-entity delete_entity / delete_references round-trips.""" + # Call the method: buffers, does not delete yet self.cleanup.delete_entity(self.sample_urn) + self.mock_graph.hard_delete_entities.assert_not_called() - # Verify deletion happened - self.mock_graph.delete_entity.assert_called_once_with( - urn=self.sample_urn.urn(), hard=True - ) - self.mock_graph.delete_references_to_urn.assert_called_once_with( - urn=self.sample_urn.urn(), - dry_run=False, + # Flushing the buffer issues the batch hard delete + self.cleanup._flush_pending_deletes() + self.mock_graph.hard_delete_entities.assert_called_once_with( + [self.sample_urn.urn()] ) - # Report update - self.assertEqual(self.report.num_hard_deleted, 1) - self.assertEqual(self.report.num_hard_deleted_by_type.get("dataset"), 1) - self.assertEqual(self.report.num_soft_deleted_entity_removal_started, 1) + # Legacy per-entity paths are not used + self.mock_graph.delete_entity.assert_not_called() + self.mock_graph.delete_references_to_urn.assert_not_called() + + def test_cleanup_batch_deletes_eligible_and_skips_references(self): + """Eligible (soft-deleted past retention) urns are hard-deleted in a batch + via hard_delete_entities; recent ones are retained; per-entity + delete_entity / delete_references are not used.""" + now_ms = int(datetime.now(timezone.utc).timestamp() * 1000) + old_ms = now_ms - 20 * 24 * 60 * 60 * 1000 # 20d > retention_days=10 + recent_ms = now_ms - 1 * 24 * 60 * 60 * 1000 # 1d < retention + + urns = [ + "urn:li:dataset:(urn:li:dataPlatform:example,old_one,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:example,old_two,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:example,recent_one,PROD)", + ] + self.mock_graph.get_urns_by_filter.return_value = urns + + def raw(entity_urn, aspects): + t = recent_ms if "recent" in entity_urn else old_ms + return { + "aspects": { + "status": {"value": {"removed": True}, "created": {"time": t}} + } + } + + self.mock_graph.get_entity_raw.side_effect = raw + + self.cleanup.cleanup_soft_deleted_entities() + + deleted: set = set() + for c in self.mock_graph.hard_delete_entities.call_args_list: + deleted.update(c.args[0]) + assert deleted == {urns[0], urns[1]} # recent_one retained + self.mock_graph.delete_references_to_urn.assert_not_called() + self.mock_graph.delete_entity.assert_not_called() + + # Report update: both old urns hard-deleted, recent one retained + self.assertEqual(self.report.num_hard_deleted, 2) + self.assertEqual(self.report.num_hard_deleted_by_type.get("dataset"), 2) + self.assertEqual(self.report.num_soft_deleted_entity_removal_started, 2) + self.assertEqual(self.report.num_soft_deleted_retained_due_to_age, 1) def test_delete_entity_respects_deletion_limit(self): """Test that delete_entity respects the deletion limit.""" @@ -321,8 +359,11 @@ def test_delete_soft_deleted_entity_old_enough(self): # Call the method self.cleanup.delete_soft_deleted_entity(self.sample_urn) - # Verify deletion was attempted - self.mock_graph.delete_entity.assert_called_once() + # Verify deletion was attempted: urn buffered for batch hard delete + self.cleanup._flush_pending_deletes() + self.mock_graph.hard_delete_entities.assert_called_once_with( + [self.sample_urn.urn()] + ) self.assertEqual(self.report.num_soft_deleted_retained_due_to_age, 0) self.assertIsNone( self.report.num_soft_deleted_retained_due_to_age_by_type.get("dataset")