Skip to content

perf(ingestion/mode): Parallelize report processing across all spaces#16408

Merged
askumar27 merged 11 commits into
mode_perf_improvementsfrom
mode_cross_space_parallelization
Mar 12, 2026
Merged

perf(ingestion/mode): Parallelize report processing across all spaces#16408
askumar27 merged 11 commits into
mode_perf_improvementsfrom
mode_cross_space_parallelization

Conversation

@askumar27

@askumar27 askumar27 commented Mar 3, 2026

Copy link
Copy Markdown
Contributor

Summary

Major performance overhaul of the Mode ingestion connector: cross-space parallelized report processing, multi-process SQL parsing, rate limiting, and numerous bug fixes for definition expansion, lineage extraction, and memory usage.

Motivation

The Mode connector had several performance and correctness issues for large workspaces:

  1. Straggler problem: Spaces were processed sequentially — each space's reports were parallelized, but threads sat idle waiting for slow reports before moving to the next space. With 80 spaces and 2,756 reports, effective parallelism was ~3.5x out of 15 configured threads (~23% utilization).
  2. Unbounded memory: lru_cache(maxsize=None) on _get_request_json cached every API response forever, causing OOM on large workspaces.
  3. CPU-bound SQL parsing blocking I/O threads: sqlglot lineage computation held the GIL, preventing concurrent API calls.
  4. No rate limiting: Concurrent threads could trigger Mode's 429 rate limits with no backoff coordination.
  5. Broken cooperative timeout: The copy=False optimization (commit d5a1311a50) eliminated all deepcopy calls, but cooperate() — which enforces the 10-second SQL lineage timeout — only ran inside the __deepcopy__ wrapper. SQL lineage could run indefinitely.
  6. Definition expansion bugs: False circular detection between sibling definitions, broken no-alias and digit-named definitions, and trailing -- comments swallowing closing parens.

Changes Overview

Architecture: Cross-space parallelization

Three-phase processing model:

  1. Collection phase (main thread, fast): Iterate all spaces, emit space containers, collect all (space_token, report/dataset) tuples into flat lists.
  2. Report processing (global thread pool): All reports from all spaces processed in one ThreadedIteratorExecutor pool — no more per-space thread pools.
  3. Dataset processing (global thread pool): Same pattern for datasets.

This eliminates the straggler problem — threads are always busy processing the next available report regardless of which space it belongs to.

Multi-process SQL parsing

  • CPU-bound sqlglot lineage parsing offloaded to a ProcessPoolExecutor (spawn context) to bypass the GIL
  • Worker processes get their own SchemaResolver and DataHubGraph connections via _init_sql_parse_worker
  • Auto-sized to cpu_count - 1, capped at max_threads
  • Falls back to in-thread parsing if the process pool breaks or times out

Rate limiting

  • All Mode API calls go through a configurable RateLimiter (api_options.requests_per_minute, default 180)
  • Mode's limit is ~240 req/min; default of 180 leaves headroom
  • RateLimiter sleep moved outside the lock to reduce thread contention

SQL parsing performance (sqlglot_lineage.py)

  • Scope-based table resolution in _get_join_side_tables: Direct scope source walking instead of expensive to_node() column-lineage approach
  • Scope-aware join traversal: find_all_in_scope instead of find_all to avoid descending into subqueries
  • Eliminated redundant deepcopy: In-place select list swapping in _get_raw_col_upstreams_for_expression
  • copy=False in to_node(): Avoids full AST deep copy during column-level lineage
  • Restored cooperative timeout: Added explicit cooperate() calls at 4 hot-loop points (column loop, tree walk, to_node() call, scope traversal)

Bug fixes

  • Definition expansion: Fixed false circular detection (siblings no longer block each other), added support for nested/no-alias/digit-named definitions, added newline before closing paren to prevent trailing -- comments from swallowing it
  • Table-level lineage on CLL timeout: Now emits table-level lineage even when column-level lineage fails (previously returned nothing)
  • OOM from unbounded cache: Removed @lru_cache(maxsize=None) on _get_request_json; replaced with bounded caches and manual caching
  • Data source lookup: O(1) dict lookup by ID instead of O(n) scan, with improved error messages
  • Thread-safe report: Added threading.Lock to ModeSourceReport for counter increments and report_warning/report_failure
  • Safe HTTP error checks: _is_http_404 helper checks error.response for None before accessing .status_code
  • HTTPError subclass construction: HTTPError429/HTTPError504 now pass response= kwarg

Other improvements

  • Chart API call skipping: When explorations_count == 0, chart API calls are skipped entirely
  • exclude_personal_collections config (default True): Server-side filtering via Mode's ?filter=custom
  • items_per_page validation: Range expanded from 1-30 to 1-1000 (Mode API supports up to 1000)
  • Removed unused _get_last_query_run method
  • Unified processing: Merged emit_dashboard_mces/emit_chart_mces/emit_dataset_mces into _process_report and _process_dataset

ThreadedIteratorExecutor improvements

  • Proper exception propagation from worker threads (was silently swallowed before)
  • stop_event for cooperative shutdown
  • Drain thread to prevent deadlock when worker exceptions trigger cancellation

Dependencies

  • Added cachetools to Mode connector's dependency set

Impact Assessment

Metric Before After
Wall clock (projected) ~5h 46m ~1h 20m
Effective parallelism 3.5x ~14.5x
Thread utilization ~23% ~97%
SQL lineage timeout broken (dead code) restored
Memory unbounded (lru_cache on all API responses) bounded

Affected Components: Mode ingestion source, sqlglot_lineage.py, ThreadedIteratorExecutor, RateLimiter
Breaking Changes: None — max_threads=1 (default) preserves sequential behavior. exclude_personal_collections=True (new default) changes the API filter from ?filter=all to ?filter=custom; set to False to restore old behavior.
Risk Level: Medium — substantial refactoring of processing loop, but guarded by fallback paths (single-thread mode, in-process SQL parsing) and comprehensive test coverage.

Test plan

  • ./gradlew :metadata-ingestion:lintFix — ruff + mypy pass
  • SQL parsing tests: 334+ passed (tests/unit/sql_parsing/), including new CTE chain join tables test
  • Mode unit tests: 36+ passed (tests/unit/test_mode_source.py) — definition expansion, _is_http_404, thread-safe report, timestamp parsing
  • Mode threading tests: 880 lines (tests/integration/mode/test_mode_threading.py) — cross-space parallelization, single-threaded fallback, dataset error isolation, chart skip optimization
  • ThreadedIteratorExecutor tests: exception propagation and stop-event behavior
  • Updated golden file for new chart URN ordering and lineage changes
  • Manual validation with Mode ingestion on production-scale workspace

🤖 Generated with Claude Code

askumar27 and others added 4 commits March 2, 2026 17:18
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract inline dataset processing from _emit_workunits_for_space() into
its own method with the same error-isolation pattern as _process_report().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace sequential space processing with a global thread pool. Reports
from all spaces are now processed in a single ThreadedIteratorExecutor,
eliminating the straggler problem where threads sat idle waiting for
slow reports in one space before moving to the next.

Also removes _clear_sql_parsing_caches() since the underlying LRU
caches are already bounded at maxsize=1000 via functools.lru_cache.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…itecture

Update TestDatasetErrorIsolation to call _process_dataset() directly
instead of the removed _emit_workunits_for_space() method.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions github-actions Bot added the ingestion PR or Issue related to the ingestion of metadata label Mar 3, 2026
@github-actions

github-actions Bot commented Mar 3, 2026

Copy link
Copy Markdown
Contributor

Linear: ING-1803

@codecov

codecov Bot commented Mar 3, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 97.10145% with 2 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
...ata-ingestion/src/datahub/ingestion/source/mode.py 97.22% 1 Missing ⚠️
...gestion/src/datahub/sql_parsing/sqlglot_lineage.py 96.96% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

@rajatoss

rajatoss commented Mar 3, 2026

Copy link
Copy Markdown
Member

Connector Tests Results

Connector tests failed for commit 1b32a7b

View full test logs →

Autogenerated by the connector-tests CI pipeline.

@maggiehays maggiehays added the pending-submitter-response Issue/request has been reviewed but requires a response from the submitter label Mar 3, 2026
…ecutor

Offload CPU-bound sqlglot parsing to a process pool while keeping I/O
(API calls) in threads. Each worker process creates its own DataHubGraph
connection and SchemaResolver cache. Gracefully degrades to in-thread
parsing if pool creation fails or a worker crashes (BrokenProcessPool).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@maggiehays maggiehays added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Mar 3, 2026
- Import BrokenProcessPool from concurrent.futures.process
- Declare _sql_parse_pool attribute type in __init__ to avoid no-redef

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…copy=False

The copy=False optimization (commit d5a1311) eliminated deepcopy calls,
but the only cooperate() call site was inside the __deepcopy__ wrapper.
This made the 10-second SQL lineage timeout completely dead.

Add explicit cooperate() calls at key iteration points in sqlglot_lineage.py
and add a timeout to future.result() in mode.py for the ProcessPoolExecutor
path where contextvars-based cooperative timeout doesn't cross process boundaries.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
_list_joins used Expression.find_all(Join) which descends into CTE
definitions, causing joins to be re-processed in the outer scope after
already being handled in their own CTE scopes. This triggered redundant
to_node() calls with expensive AST deep-copy/serialize/re-parse cycles.

Replace with find_all_in_scope() which stops at scope boundaries.
For a query with 4 CTEs and 11+ JOINs this eliminates 79% of to_node()
calls (7,206 -> 1,538) and gives a 5x speedup in _list_joins.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…_tables

Replace expensive to_node() calls in _get_join_side_tables with direct
scope.sources resolution. This avoids the costly deepcopy -> sql() ->
parse_one() cycle for each join side, while keeping the precise to_node()
approach for ON clause column resolution via intersection filtering.

Key changes:
- New _collect_tables_from_scope() walks scope.sources recursively through
  SUBQUERY/DERIVED_TABLE scopes to find physical Table nodes
- Smart CTE handling: only follows CTE sources actually referenced in
  FROM/JOIN clauses (avoids sibling CTE over-inclusion from sqlglot)
- Hoists FROM clause resolution outside the per-join loop
- Skips UDTF scopes (LATERAL/UNNEST) which include correlated references

Benchmark on production 27-min query: 44.4s -> 7.5s (5.9x speedup).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@github-actions

github-actions Bot commented Mar 5, 2026

Copy link
Copy Markdown
Contributor

Your PR has been assigned to tamas for review (ING-1803).

@maggiehays maggiehays added pending-submitter-merge and removed needs-review Label for PRs that need review from a maintainer. labels Mar 6, 2026
…oolExecutor

Each worker process re-initialized its own DataHubGraph connection and
SchemaResolver, wasting memory without proportional benefit. Revert to
inline SQL parsing using the main process's shared SchemaResolver, which
benefits from LRU cache hits across queries.

Thread-based cross-space parallelization (ThreadPoolExecutor for I/O) and
cooperative timeout fixes in sqlglot_lineage.py are preserved.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@askumar27 askumar27 merged this pull request into mode_perf_improvements Mar 12, 2026
39 of 43 checks passed
@askumar27 askumar27 deleted the mode_cross_space_parallelization branch March 12, 2026 01:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion PR or Issue related to the ingestion of metadata pending-submitter-merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants