Skip to content

feat(cdk): add CombinedExtractor #551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

lazebnyi
Copy link
Contributor

@lazebnyi lazebnyi commented May 17, 2025

What

Resolved: https://github.com/airbytehq/airbyte-internal-issues/issues/13046

Introduce a CombinedExtractor that merges the output of multiple sub-extractors into a single record.

How

  • Accepts a list of RecordExtractor instances.

  • For each response:

    1. Invokes each extractor to yield an iterable of records.
    2. Uses zip() to pair records at the same index across all extractors.
    3. Merges each group of records into a single dictionary using dict.update().

⚠️ UNIT TESTS TO BE ADDED LATER

Example

If one extractor yields {"name": "Alice", "age": 30} and another yields {"country": "US"}, the combined result will be:

{"name": "Alice", "age": 30, "country": "US"}

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features
    • Introduced a new extractor type, CombinedExtractor, allowing merging of multiple record extractors into unified records.
  • Enhancements
    • Expanded record selector options to support the CombinedExtractor for more flexible data extraction.

@lazebnyi
Copy link
Contributor Author

lazebnyi commented May 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@github-actions github-actions bot added the enhancement New feature or request label May 17, 2025
Copy link
Contributor

coderabbitai bot commented May 17, 2025

📝 Walkthrough

Walkthrough

A new CombinedExtractor feature is introduced, enabling the merging of multiple record extractors' outputs into unified records. This involves schema updates, a new extractor class, expanded Pydantic models, and factory logic to support recursive extractor composition within the declarative source framework.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Added CombinedExtractor definition; updated RecordSelector.extractor to accept CombinedExtractor; allowed recursive extractor composition.
airbyte_cdk/sources/declarative/extractors/combined_extractor.py Introduced CombinedExtractor class, merging records from multiple sub-extractors by zipping and combining their outputs.
airbyte_cdk/sources/declarative/extractors/__init__.py Imported and exported CombinedExtractor in the package's public API.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Added Pydantic model for CombinedExtractor; expanded extractor unions to include it; updated schema_loader description; enabled forward references for recursive extractor definitions.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Added support for constructing CombinedExtractor from models, including recursive creation of sub-extractors in the factory logic.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant DeclarativeSource
    participant ModelToComponentFactory
    participant CombinedExtractor
    participant SubExtractor1
    participant SubExtractor2

    User->>DeclarativeSource: Define stream with CombinedExtractor
    DeclarativeSource->>ModelToComponentFactory: Build extractor from model
    ModelToComponentFactory->>CombinedExtractor: Instantiate with sub-extractors
    CombinedExtractor->>SubExtractor1: extract_records(response)
    CombinedExtractor->>SubExtractor2: extract_records(response)
    CombinedExtractor->>CombinedExtractor: Zip and merge records
    CombinedExtractor-->>DeclarativeSource: Yield merged records
Loading

Would you like me to provide a more detailed sequence diagram focusing on the internal merging logic of CombinedExtractor, or is this overview sufficient for your review, wdyt?


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between f79bb74 and c452c67.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py
⏰ Context from checks skipped due to timeout of 90000ms (6)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (17)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (2)

1-3: Update copyright year.

The copyright notice shows 2025, but the current year is 2024.

-# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
+# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

13-35: Well-documented extractor implementation with a clear docstring.

The class documentation clearly explains the purpose and behavior of the extractor. Could you consider adding information about edge cases to the docstring? Such as:

  1. What happens when extractors produce different numbers of records?
  2. How are key conflicts handled when merging records from different extractors?
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)

38-42: Edge-case: infinite loop when keys is empty.

If keys_extractor yields an empty list, len(keys) is 0; islice(values, 0) returns an empty iterator, leaving chunk empty and the loop spinning forever because the break is never reached.

Maybe bail out early?

     keys = list(self.keys_extractor.extract_records(response))
-    values = self.values_extractor.extract_records(response)
+    if not keys:
+        return  # nothing to map, silently exit
+    values = self.values_extractor.extract_records(response)

Would that make sense?

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (2)

124-125: Guard against product() on empty input?

If self.stream_configs is an empty list, all_indexed_streams becomes empty and product(*all_indexed_streams) raises TypeError.
Perhaps return an empty list (or [{}]) in that scenario?

all_indexed_streams = list(prepare_streams())
if not all_indexed_streams:
    return [{}]
return [merge_combination(c) for c in product(*all_indexed_streams)]

Would that make the resolver more robust?

🧰 Tools
🪛 GitHub Actions: Linters

[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]


[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]


154-159: create_or_update path handling silently ignores falsy values – intentional?

if parsed_value and not updated and resolved_component.create_or_update:

• A legitimate falsy value (e.g., 0, False, "") will block path creation.
• Should the check be updated is False instead of the truthiness of parsed_value?

Just checking intent, wdyt?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (5)

1744-1768: Polish KeyValueExtractor schema definition
The block currently uses placeholder text for description and an enum value with extra whitespace (enum: [ KeyValueExtractor ]). Could we clarify the description, remove the leading space in the enum, and include an examples section under properties to illustrate typical usage? wdyt?


1769-1791: Enhance CombinedExtractor schema clarity
Similar to KeyValueExtractor, the CombinedExtractor definition has a placeholder description and an enum with surrounding spaces (enum: [ CombinedExtractor ]). Should we refine the description to explain its purpose, normalize the enum formatting, and add example usages for the extractors array? wdyt?


2359-2364: Define schema_filter description and examples
The new schema_filter property uses a placeholder description. Could we provide a meaningful description (e.g. "Filter to apply to schema records before merging") and add examples showing a RecordFilter or CustomRecordFilter in action? wdyt?


4053-4055: Improve create_or_update property documentation
The create_or_update flag lacks a descriptive comment. Could we add a description to explain its behavior (e.g. "If true, existing fields will be updated; otherwise only new fields are created")? wdyt?


4122-4126: Add examples for multi-config stream_config
The stream_config property now accepts either a single StreamConfig or an array. Could we include examples in the schema comments to illustrate both cases and guide users? wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2247-2259: Same default-argument pattern & nil-safety in create_combined_extractor

For the same reasons as above, could we:

  1. Change the decoder default to None and lazy-instanciate it?
  2. (Optional) Validate model.extractors is not empty to fail fast.
-        decoder: Optional[Decoder] = JsonDecoder(parameters={}),
+        decoder: Optional[Decoder] = None,
 ...
-        extractors = [
+        if decoder is None:
+            decoder = JsonDecoder(parameters={})
+
+        extractors = [
             self._create_component_from_model(model=extractor, decoder=decoder, config=config)
             for extractor in model.extractors
         ]
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (6)

1481-1482: Add validation metadata for create_or_update?

The new flag looks great. Would you consider wrapping it in Field(..., description="...", title="Create Or Update") to keep the rich-schema experience consistent with the other attributes, wdyt?


1499-1500: Field name vs. type plurality

stream_config now accepts a list, yet the attribute remains singular.
To avoid confusion for users authoring manifests, would renaming it to stream_configs (and adding an alias for backwards compatibility) make sense, wdyt?


1847-1856: KeyValueExtractor – consider min-items & richer docs

  1. Should we enforce at least one key-value pair by adding min_items=1 on keys_extractor/values_extractor?
  2. The description="placeholder" texts will surface in the JSON-schema. Maybe flesh them out to guide connector authors, wdyt?

Both are tiny polish items but improve UX.


1858-1864: CombinedExtractor – guarantee non-empty extractors list

extractors is required but could still be empty. Adding min_items=1 in Field() would prevent a silent mis-configuration. Worth tightening?
Also, expanding the placeholder description will help future users.


1903-1903: Forward-ref update for dependant models

RecordSelector now references CombinedExtractor; you added CombinedExtractor.update_forward_refs() (👍).
For full safety, should we also call RecordSelector.update_forward_refs() so that external callers importing only that class don’t have to?


2436-2438: schema_filter placeholder description

Nice addition! Would you like to replace "placeholder" with something actionable (e.g. “Filter to drop/keep schema properties based on a record-level predicate”)?
This will read better in generated docs, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between fa8d54d and 4ee3fb5.

📒 Files selected for processing (10)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (6 hunks)
  • airbyte_cdk/sources/declarative/extractors/__init__.py (2 hunks)
  • airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1 hunks)
  • airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1 hunks)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (7 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9 hunks)
  • airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2 hunks)
  • airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (6 hunks)
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (5)
airbyte_cdk/sources/declarative/extractors/__init__.py (4)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
  • CombinedExtractor (14-45)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
  • CombinedExtractor (1858-1863)
  • DpathExtractor (696-709)
  • KeyValueExtractor (1847-1855)
airbyte_cdk/sources/declarative/extractors/http_selector.py (1)
  • HttpSelector (13-37)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
  • KeyValueExtractor (15-42)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (3)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
  • RecordExtractor (12-27)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • KeyValueExtractor (1847-1855)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
  • extract_records (38-45)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (3)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
  • RecordExtractor (12-27)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • CombinedExtractor (1858-1863)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
  • extract_records (34-42)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • StreamConfig (1485-1494)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
  • CombinedExtractor (14-45)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (20)
  • CombinedExtractor (1858-1863)
  • DpathExtractor (696-709)
  • KeyValueExtractor (1847-1855)
  • Config (134-135)
  • Config (148-149)
  • Config (162-163)
  • Config (176-177)
  • Config (194-195)
  • Config (208-209)
  • Config (222-223)
  • Config (236-237)
  • Config (250-251)
  • Config (264-265)
  • Config (278-279)
  • Config (292-293)
  • Config (308-309)
  • Config (322-323)
  • Config (336-337)
  • Config (370-371)
  • DynamicSchemaLoader (2429-2458)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
  • KeyValueExtractor (15-42)
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (1)
  • DynamicSchemaLoader (120-297)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

[error] 182-182: Argument 1 to "filter_records" of "RecordFilter" has incompatible type "ItemsView[str, Any]"; expected "Iterable[Mapping[str, Any]]" [arg-type]


[error] 183-183: Invalid index type "int" for "Mapping[str, Any]"; expected type "str" [index]

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py

[error] 97-97: Function is missing a return type annotation [no-untyped-def]


[error] 98-98: Function is missing a type annotation [no-untyped-def]


[error] 103-103: Function is missing a type annotation [no-untyped-def]


[error] 106-106: Function is missing a return type annotation [no-untyped-def]


[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]


[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]


[error] 112-112: Unsupported left operand type for + ("object") [operator]


[error] 113-113: Need type annotation for "item" [var-annotated]


[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]


[error] 115-115: Function is missing a type annotation [no-untyped-def]


[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]


[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 2476-2476: Argument 1 to "_create_component_from_model" of "ModelToComponentFactory" has incompatible type "RecordFilter | CustomRecordFilter | None"; expected "BaseModel" [arg-type]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

[error] 1493-1493: Missing type parameters for generic type "List" [type-arg]

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/extractors/__init__.py (2)

5-8: Imports for new extractor classes are added correctly.

The imports for the new CombinedExtractor and KeyValueExtractor classes follow the existing import pattern and alphabetical ordering, which is good for code organization.


16-25: Updated __all__ list to include new extractor classes.

The new extractor classes are correctly added to the __all__ list, maintaining alphabetical ordering which helps with maintainability.

airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2)

25-25: New parameter create_or_update added with sensible default.

The boolean parameter is added with a default of False, which maintains backward compatibility. This parameter will control whether new config paths should be created when setting values during component resolution.


38-38: Matching create_or_update parameter added to ResolvedComponentMappingDefinition.

Good consistency in adding the same parameter with the same default value to both dataclasses.

airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

265-265: Good refactoring for cleaner code.

Using the dynamic_streams property instead of directly calling _dynamic_stream_configs improves code readability and follows good OOP principles by encapsulating the implementation details.

airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)

38-45: Clean implementation for combining multiple extractors.

The implementation is concise and effective. It creates iterables from each extractor, zips them together (handling the case where they have different lengths by stopping at the shortest), and merges the records using dict.update().

Note that when merging dictionaries, if the same key exists in multiple records, later records will overwrite values from earlier records. This is the expected behavior of dict.update(), but worth being aware of.

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

3369-3374: Confirm RecordSelector supports new extractors
Great to see CombinedExtractor and KeyValueExtractor added to the extractor union. This aligns with the new definitions and will enable composite extraction patterns.

@lazebnyi
Copy link
Contributor Author

lazebnyi commented May 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

octavia-squidington-iii and others added 2 commits May 17, 2025 01:59
@lazebnyi
Copy link
Contributor Author

lazebnyi commented May 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@lazebnyi lazebnyi requested review from bnchrch and brianjlai May 17, 2025 03:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant