Skip to content

refactor(ingestion): workunit processors#17852

Open
sgomezvillamor wants to merge 35 commits into
masterfrom
claude/optimistic-albattani-k19c77
Open

refactor(ingestion): workunit processors#17852
sgomezvillamor wants to merge 35 commits into
masterfrom
claude/optimistic-albattani-k19c77

Conversation

@sgomezvillamor

@sgomezvillamor sgomezvillamor commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Summary

This PR standardizes workunit processing across all DataHub ingestion sources by introducing a common processor interface with automatic discovery and pluggable observability.

Main Contributions

1. Standardized Processor Interface

All processors now inherit from a common WorkunitProcessor base class with:

  • Consistent lifecycle (create(), should_enable(), process())
  • Dependency injection via WorkunitProcessorContext
  • Automatic report registration

2. Pluggable Sub-Reports for Observability

Each processor can define its own typed report class for metrics:

@dataclass
class ValidateInputFieldsProcessorReport(WorkunitProcessorReport):
    num_input_fields_filtered: int = 0
    num_workunits_with_invalid_fields: int = 0

class ValidateInputFieldsProcessor(WorkunitProcessor[ValidateInputFieldsProcessorReport]):
    def process(self, stream):
        self.report.num_input_fields_filtered += 1  # Properly typed!

3. Automatic Processor Discovery

Eliminated ~72 source overrides of get_workunit_processors(). Sources no longer need to manually instantiate processors - they're auto-discovered and enabled based on:

  • should_enable() conditions (e.g., config flags)
  • Explicit allow/exclude lists when needed

Before:

def get_workunit_processors(self):
    return [
        functools.partial(auto_status_aspect, self.ctx, self.report),
        functools.partial(auto_browse_path_v2, self.ctx, self.report),
        # ... manual wiring for each processor
    ]

After:

# Nothing needed - processors auto-discovered!
# Or opt-out when needed:
def get_excluded_workunit_processors(self):
    return [AutoIncrementalLineageProcessor]  # Type-safe

Observability

Startup logs show which processors are enabled:

[2026-06-11 17:42:50,191] INFO - Workunit processor 'AutoStatusAspectProcessor' enabled
[2026-06-11 17:42:50,191] INFO - Workunit processor 'ValidateInputFieldsProcessor' enabled
[2026-06-11 17:42:50,191] INFO - Workunit processor 'EnsureAspectSizeProcessor' enabled
[2026-06-11 17:42:50,191] INFO - Workunit processor 'AutoIncrementalLineageProcessor' disabled by should_enable()

Report structure with processor-specific metrics:

 'workunit_processor_reports': {'AutoStatusAspectProcessor': {'num_errors': 0, 'num_status_aspects_emitted': 48},
                                'AutoMaterializeReferencedTagsTermsProcessor': {'num_exceptions': 0},
                                'ValidateDuplicateSchemaFieldPathsProcessor': {'total_schema_aspects': 25,
                                                                               'schemas_with_duplicates': 0,
                                                                               'duplicated_field_paths': 0},
                                'ValidateEmptySchemaFieldPathsProcessor': {'total_schema_aspects': 25,
                                                                           'schemas_with_empty_fields': 0,
                                                                           'empty_field_paths': 0},
                                'AutoBrowsePathV2Processor': {'num_out_of_batch': 20,
                                                              'num_out_of_order': 6,
                                                              'num_browse_path_v2_emitted': 28,
                                                              'num_container_or_legacy_emitted': 0,
                                                              'num_fallback_emitted': 0},
                                'AutoWorkunitsReporterProcessor': {},
                                'AutoPatchLastModifiedProcessor': {'num_patches_emitted': 0},
                                'ValidateInputFieldsProcessor': {'num_input_fields_filtered': 0,
                                                                 'num_workunits_with_invalid_fields': 0,
                                                                 'num_workunits_skipped_entirely': 0},
                                'EnsureAspectSizeProcessor': {'num_truncations_by_aspect': {}},
                                'AutoStaleEntityRemovalProcessor': {}},

Architecture

New module structure:

  • datahub.ingestion.api.workunit_processor - Base class and context
  • datahub.ingestion.workunit_processors/ - All processor implementations:
    • auto_* - Enrichment processors (add metadata)
    • validate_* - Validation processors (remove invalid data)
    • ensure_* - Enforcement processors (enforce constraints)

Processor lifecycle:

  1. Auto-discover all processors from registry
  2. Filter by should_enable() conditions
  3. Apply source allow/exclude lists
  4. Instantiate via create() with dependency injection
  5. Register reports with source report
  6. Stream workunits through process() pipeline

Implementation Details

  • Generic typing for type-safe report access (WorkunitProcessor[ReportT])
  • Runtime type extraction via existing get_class_from_annotation() utility
  • Naming convention documented in base class docstring
  • Error handling: log + raise to halt ingestion on processor failures
  • Breaking changes documented in docs/how/updating-datahub.md

Testing

  • All existing tests updated for new processor API
  • Fixed timezone-related test flakiness (8 test files)
  • All processors now have report classes ready for metrics

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy

claude added 4 commits June 10, 2026 13:44
…urce overrides

- Add WorkunitProcessor ABC with NAME constant, should_enable(), create(), process()
- Add WorkunitProcessorContext and WorkunitProcessorReport dataclasses
- Add WorkunitProcessorReport tracking in SourceReport.workunit_processor_reports
- Add get_excluded_workunit_processors() and get_allowed_workunit_processors() hooks on Source
- Add get_stale_entity_state_type() hook for custom checkpoint state types
- Create 14 processor classes in workunit_processors/ package replacing all old
  auto_work_units/ functions and source_helpers functions
- Refactor StaleEntityRemovalHandler constructor to take direct dependencies instead
  of a source reference; remove create() classmethod
- Remove ~62 get_workunit_processors() overrides from sources (base class handles them)
- Iceberg: use get_excluded_workunit_processors() for parallelism-incompatible processors
- PowerBI: minimal get_workunit_processors() override for modified_since mode
- Utility sources (datahub_apply, datahub_gc, dataprocess_cleanup, lineage, rdf,
  sql_queries, datahub_source, file): use get_allowed_workunit_processors()

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
- test_stateful_ingestion: remove DummySource.get_workunit_processors() override
  and StaleEntityRemovalHandler.create() call (base class handles it automatically)
- test_sql_queries: update processor assertions to use new NAME constants
  instead of partial function identity checks
- test_auto_validate_input_fields: update import to workunit_processors package
- test_ensure_aspect_size: update import to workunit_processors package

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
…essor API

Replace the manual get_workunit_processors() + StaleEntityRemovalHandler.create()
pattern with the new automatic wiring approach. Document the new hook methods:
- get_excluded_workunit_processors() for parallelism-incompatible processors
- get_allowed_workunit_processors() for utility/minimal sources
- get_stale_entity_state_type() for custom checkpoint state types

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
@github-actions github-actions Bot added the ingestion PR or Issue related to the ingestion of metadata label Jun 10, 2026
@codecov

codecov Bot commented Jun 10, 2026

Copy link
Copy Markdown

❌ 1 Tests Failed:

Tests completed Failed Passed Skipped
14761 1 14760 118
View the top 2 failed test(s) by shortest run time
tests.integration.mode.test_mode_threading::test_threading_speedup
Stack Traces | 2.89s run time
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_threading_speedup0')

    def test_threading_speedup(tmp_path):
        """Verify that max_threads > 1 provides wall-clock speedup with simulated latency.
    
        Uses 10 reports with 2 queries each. Each HTTP call sleeps 50ms.
        With 4 threads, expect ~2-4x wall-clock speedup.
    
        Note: No @time_machine.travel here -- time_machine patches time.monotonic()
        which would make our wall-clock measurements return 0.
        """
        num_reports = 10
        num_queries_per_report = 2
        latency = 0.05  # 50ms per request
    
        responses = _build_perf_response_map(num_reports, num_queries_per_report)
    
        # Sequential run (max_threads=1)
        with patch(
            "datahub.ingestion.source.mode.requests.Session",
            side_effect=lambda *a, **kw: ThreadSafeMockSession(responses, latency=latency),
        ):
            pipeline_seq = Pipeline.create(
                {
                    "run_id": "mode-perf-sequential",
                    "source": {
                        "type": "mode",
                        "config": {
                            "token": "xxxx",
                            "password": "xxxx",
                            "connect_uri": "https://app.mode.com/",
                            "workspace": "acryl",
                            "max_threads": 1,
                        },
                    },
                    "sink": {
                        "type": "file",
                        "config": {
                            "filename": f"{tmp_path}/perf_seq.json",
                        },
                    },
                }
            )
            t0 = time.perf_counter()
            pipeline_seq.run()
            sequential_time = time.perf_counter() - t0
    
        # Threaded run (max_threads=4)
        with patch(
            "datahub.ingestion.source.mode.requests.Session",
            side_effect=lambda *a, **kw: ThreadSafeMockSession(responses, latency=latency),
        ):
            pipeline_par = Pipeline.create(
                {
                    "run_id": "mode-perf-parallel",
                    "source": {
                        "type": "mode",
                        "config": {
                            "token": "xxxx",
                            "password": "xxxx",
                            "connect_uri": "https://app.mode.com/",
                            "workspace": "acryl",
                            "max_threads": 4,
                        },
                    },
                    "sink": {
                        "type": "file",
                        "config": {
                            "filename": f"{tmp_path}/perf_par.json",
                        },
                    },
                }
            )
            t0 = time.perf_counter()
            pipeline_par.run()
            parallel_time = time.perf_counter() - t0
    
        speedup = sequential_time / parallel_time if parallel_time > 0 else float("inf")
    
        print(
            f"\nPerf results: sequential={sequential_time:.2f}s, "
            f"parallel={parallel_time:.2f}s, speedup={speedup:.1f}x"
        )
    
        # With 4 threads and 50ms latency, we should see at least 1.5x speedup.
        # Using a conservative threshold to avoid flaky CI.
>       assert speedup > 1.5, (
            f"Expected >1.5x speedup but got {speedup:.2f}x "
            f"(seq={sequential_time:.2f}s, par={parallel_time:.2f}s)"
        )
E       AssertionError: Expected >1.5x speedup but got 0.59x (seq=0.96s, par=1.62s)
E       assert 0.5936169040388863 > 1.5

.../integration/mode/test_mode_threading.py:511: AssertionError
tests.integration.snowplow.test_snowplow_performance::test_parallel_fetching_performance
Stack Traces | 3.61s run time
pytestconfig = <_pytest.config.Config object at 0x7ff8caffa410>
tmp_path = PosixPath('.../pytest-of-runner/pytest-0/test_parallel_fetching_perform0')

    @pytest.mark.integration
    def test_parallel_fetching_performance(pytestconfig, tmp_path):
        """
        Test that parallel deployment fetching is significantly faster than sequential.
    
        This test compares performance with parallel fetching enabled vs disabled.
        """
        # Generate dataset with 100 schemas (enough to see performance difference)
        mock_data_structures = generate_mock_data_structures(100)
    
        # Simulate API delay (10ms per call to make difference measurable)
        def mock_get_deployments(schema_hash: str) -> List[DataStructureDeployment]:
            """Simulate API delay."""
            time.sleep(0.01)  # 10ms delay
            return [
                DataStructureDeployment(
                    version="1-0-0",
                    ts="2024-01-01T00:00:00Z",
                    initiator="Test User",
                    initiator_id="user123",
                )
            ]
    
        # Test 1: Sequential fetching (parallel disabled)
        config_sequential = {
            "bdp_connection": {
                "organization_id": "test-org",
                "api_key_id": "test-key",
                "api_key": "test-secret",
            },
            "field_tagging": {"track_field_versions": True},
            "performance": {
                "enable_parallel_fetching": False,
                "max_concurrent_api_calls": 10,
            },
        }
    
        with patch(
            "datahub.ingestion.source.snowplow.snowplow.SnowplowBDPClient"
        ) as mock_client_class:
            mock_client = mock_client_class.return_value
            mock_client._authenticate = lambda: None
            mock_client._jwt_token = "mock_token"
    
            from datahub.ingestion.source.snowplow.models.snowplow_models import (
                DataStructure,
            )
    
            mock_client.get_data_structures.return_value = [
                DataStructure.model_validate(ds) for ds in mock_data_structures
            ]
            mock_client.get_data_structure_deployments.side_effect = mock_get_deployments
    
            config = SnowplowSourceConfig.model_validate(config_sequential)
            source = SnowplowSource(config, create_mock_context())
            source.bdp_client = mock_client
    
            start_time = time.time()
            list(source.schema_processor._get_data_structures_filtered())
            sequential_time = time.time() - start_time
    
        # Test 2: Parallel fetching (parallel enabled)
        config_parallel = {
            "bdp_connection": {
                "organization_id": "test-org",
                "api_key_id": "test-key",
                "api_key": "test-secret",
            },
            "field_tagging": {"track_field_versions": True},
            "performance": {
                "enable_parallel_fetching": True,
                "max_concurrent_api_calls": 10,
            },
        }
    
        with patch(
            "datahub.ingestion.source.snowplow.snowplow.SnowplowBDPClient"
        ) as mock_client_class:
            mock_client = mock_client_class.return_value
            mock_client._authenticate = lambda: None
            mock_client._jwt_token = "mock_token"
            mock_client.get_data_structures.return_value = [
                DataStructure.model_validate(ds) for ds in mock_data_structures
            ]
            mock_client.get_data_structure_deployments.side_effect = mock_get_deployments
    
            config = SnowplowSourceConfig.model_validate(config_parallel)
            source = SnowplowSource(config, create_mock_context())
            source.bdp_client = mock_client
    
            start_time = time.time()
            list(source.schema_processor._get_data_structures_filtered())
            parallel_time = time.time() - start_time
    
        # Performance assertions
        speedup = sequential_time / parallel_time
        print("\nPerformance Results:")
        print(f"  Sequential time: {sequential_time:.2f}s")
        print(f"  Parallel time: {parallel_time:.2f}s")
        print(f"  Speedup: {speedup:.2f}x")
    
        # Parallel should be at least 3x faster with 10 workers
        # (100 schemas / 10 workers = ~10 sequential batches vs 100 sequential calls)
>       assert parallel_time < sequential_time / 3, (
            f"Parallel fetching should be at least 3x faster (got {speedup:.2f}x)"
        )
E       AssertionError: Parallel fetching should be at least 3x faster (got 0.41x)
E       assert 2.542221784591675 < (1.045442819595337 / 3)

.../integration/snowplow/test_snowplow_performance.py:180: AssertionError

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Update test files and the old compatibility shim to use the new
WorkunitProcessorContext-based API introduced in the processor refactoring:

- test_auto_validate_input_fields.py: use ValidateInputFieldsProcessor.create(ctx)
  and processor.process(stream); check processor.report.num_input_fields_filtered
- test_ensure_aspect_size.py: use EnsureAspectSizeProcessor.create(ctx) via
  WorkunitProcessorContext; set payload_constraint directly for the constraint test
- auto_validate_input_fields.py: remove reference to SourceReport.num_input_fields_filtered
  which was moved to ValidateInputFieldsReport

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
…ass type

Use a TypeVar bound on the classmethod so that mypy infers the concrete
subclass type when calling ProcessorSubclass.create(ctx), rather than the
base WorkunitProcessor. This fixes mypy attr-defined errors when tests access
subclass-specific attributes (payload_constraint, ensure_view_properties_size,
etc.) on the result of create().

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
@github-actions github-actions Bot requested a deployment to datahub-wheels (Preview) June 10, 2026 15:12 Abandoned
@sgomezvillamor sgomezvillamor changed the title refactor: move workunit processors to dedicated module refactor(ingestion): workunit processors Jun 10, 2026
- test_ensure_aspect_size: fix patch paths (auto_work_units → workunit_processors),
  rename ensure_aspect_size() → process(), fix report.warnings → ctx.source_report.warnings
- test_thoughtspot_source: update _references_stale_handler to accept
  StaleEntityRemovalProcessor (new wrapper) alongside StaleEntityRemovalHandler
- test_notion_source: replace hasattr(source, stale_entity_removal_handler) check
  with a processor-chain check using stateful_ingestion config
- test_informatica: replace StaleEntityRemovalHandler.create() mock (removed) with
  a direct check for StaleEntityRemovalProcessor in the processor chain
- test_dataplex: replace deleted get_workunit_processors() override test with an
  assertion that the override no longer exists (base class handles it now)

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
- source.py: use getattr(self, "platform", None) instead of infer_platform()
  to preserve pre-refactor behavior of getattr(source, "platform", "default").
  Sources without a platform attribute now correctly fall through to "default"
  in StaleEntityRemovalHandler._init_job_id(), fixing the golden file failure
  for the file source (job ID was changing from default_stale_entity_removal
  to metadata-file_stale_entity_removal).

- test_thoughtspot_source.py: fix I001 ruff violation — move
  workunit_processors.stale_entity_removal import to after all source.*
  imports (alphabetically workunit_processors > source).

- test_dataplex_source.py: fix F401 ruff violation — remove unused
  StatefulIngestionSourceBase import that was added inside test function body.

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
…backward-compat stale removal job IDs

Add `source_platform` field to `WorkunitProcessorContext` to carry the
fully-inferred platform (including @platform_name decorator fallback).
`AutoBrowsePathV2Processor` uses this for `_prepend_platform_instance`,
fixing the Dremio platform-instance browse path regression.

The existing `platform` field (raw `self.platform` attribute) is kept
unchanged so `StaleEntityRemovalHandler._init_job_id()` still produces
`"default_stale_entity_removal"` for sources without an explicit
`platform` attribute (file, Dremio, etc.), matching pre-refactor behavior.

Also applies ruff format fix to test_thoughtspot_source.py (long lines).

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
…stion tests

StatefulIngestionSourceBase requires pipeline_name when stateful ingestion
is enabled. The new tests (test_stale_entity_removal_handler_registered and
test_stateful_ingestion_processor_wired_up) created PipelineContext without
pipeline_name, triggering ConfigurationError at source instantiation.

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
DatahubIngestionCheckpointingProvider.create() raises ValueError when
ctx.graph is None. Tests that construct sources with stateful_ingestion
enabled need both pipeline_name and a mock graph in PipelineContext,
matching the pattern used by other stateful source tests (e.g. ThoughtSpot).

Fixes:
  tests.unit.informatica.test_source.TestSourceLifecycle::test_stale_entity_removal_handler_registered
  tests.unit.ingestion.source.notion.test_notion_source::test_stateful_ingestion_processor_wired_up

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
Line 1485 in test_stale_entity_removal_handler_registered was 90 chars
(2 over the 88-char limit), causing ruff format --check to fail. Split
PipelineContext args across lines to stay within the limit.

https://claude.ai/code/session_0171jAojWBsvNMDz7havHgDy
Remove the NAME constant and get_name() method from WorkunitProcessor
and all processor implementations. Processor names are now derived
directly from class names (__name__).

Updated get_allowed_workunit_processors() and get_excluded_workunit_processors()
to accept Union[str, Type[WorkunitProcessor]], allowing sources to pass
processor classes directly for better type safety and IDE support.

Changes:
- Remove NAME constant from 14 processor classes
- Remove get_name() from WorkunitProcessor base class
- Update Source._get_source_workunit_processors() with _to_name() helper
- Update type signatures to Union[str, Type[WorkunitProcessor]]
- Add WorkunitProcessor to TYPE_CHECKING imports in source.py
- Update 12 source files to use processor classes instead of .NAME
- Update 2 test files to use .__name__ instead of .NAME
- Add explicit List[Type[WorkunitProcessor]] type annotations where needed

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Implement proper generic typing for WorkunitProcessor using the established
codebase pattern (same as Sink class). This provides type-safe report access
without boilerplate while extracting report types at runtime.

Benefits:
- self.report is properly typed throughout processor methods
- No need for _report property casting
- No redundant report_class attribute
- Single source of truth: WorkunitProcessor[MyReport]
- Uses existing get_class_from_annotation() utility (robust, tested)

Changes:
- Made WorkunitProcessor generic over _ReportT
- Created empty report classes for all 14 processors
- Updated create() to extract report class via get_class_from_annotation()
- Removed _report properties from ValidateInputFieldsProcessor and EnsureAspectSizeProcessor
- Replaced all self._report with self.report (now properly typed)

All processors now follow consistent pattern:
    @DataClass
    class MyProcessorReport(WorkunitProcessorReport):
        num_processed: int = 0

    class MyProcessor(WorkunitProcessor[MyProcessorReport]):
        def process(self, stream):
            self.report.num_processed += 1  # ✓ Properly typed!

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
sgomezvillamor and others added 3 commits June 12, 2026 16:33
Add comprehensive metrics tracking to AutoBrowsePathV2ProcessorReport:

Invariant violations:
- num_out_of_batch: URN seen in multiple batches
- num_out_of_order: Child container processed before parent

Browse path emission by source:
- num_browse_path_v2_emitted: From source-generated BrowsePathsV2
- num_container_or_legacy_emitted: Derived from Container or legacy BrowsePaths
- num_fallback_emitted: Fallback for root containers/dataFlow/dataJob

Implementation uses local variables for per-invocation tracking (telemetry)
and accumulates totals in report for overall observability.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…terializeReferencedTagsTermsProcessor

- Track invalid URNs that couldn't be materialized
- Change invalid URN message from info to warning level

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@github-actions github-actions Bot requested a deployment to datahub-wheels (Preview) June 12, 2026 14:39 Abandoned
Track number of datasets patched with lastModified from operation timestamps

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Track unexpected metadata types and status aspects emitted

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
sgomezvillamor and others added 2 commits June 12, 2026 16:58
…sProcessorReport

Move total_schema_aspects, schemas_with_duplicates, and duplicated_field_paths
from instance variables to report class fields

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…cessorReport

Move total_schema_aspects, schemas_with_empty_fields, and empty_field_paths
from instance variables to report class fields

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…InputFieldsReport

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Remove unnecessary local variables and increment report fields directly
since telemetry is sent only once after processing the entire stream

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Update tests to use the create() classmethod instead of direct instantiation
to properly initialize the report attribute

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion PR or Issue related to the ingestion of metadata pending-submitter-merge

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants