Skip to content

feat(cdk): add KeyValueExtractor #552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

lazebnyi
Copy link
Contributor

@lazebnyi lazebnyi commented May 17, 2025

What

Resolved: https://github.com/airbytehq/airbyte-internal-issues/issues/13047

Adds a KeyValueExtractor to support extracting structured records from APIs that return related keys and values in parallel arrays. This is useful for handling responses where field names and their corresponding values are returned separately — e.g., ["name", "age"] and ["Alice", 30"] -> { "name": "Alice", "age": 30 }.

How

  • Introduced a KeyValueExtractor class.
  • It takes two sub-extractors: keys_extractor and values_extractor.
  • Both extractors run on the same response.
  • Their outputs are zipped together into dictionaries using dict(zip(keys, values)).

⚠️ UNIT TESTS TO BE ADDED LATER

Summary by CodeRabbit

  • New Features
    • Introduced new record extractors: KeyValueExtractor enabling more flexible and composable record extraction strategies.

@lazebnyi
Copy link
Contributor Author

lazebnyi commented May 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@github-actions github-actions bot added the enhancement New feature or request label May 17, 2025
Copy link
Contributor

coderabbitai bot commented May 17, 2025

Warning

Rate limit exceeded

@octavia-the-approver has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 0 minutes and 43 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 50f729a and c2c1db3.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/extractors/__init__.py (2 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4 hunks)
📝 Walkthrough

Walkthrough

This update introduces two new record extractors—KeyValueExtractor and CombinedExtractor—across schema, implementation, and factory layers. It enhances schema filtering in dynamic schema loaders, adds a create_or_update flag to component mapping definitions, allows multiple stream configs in configuration resolution, and updates related data models and factories accordingly.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Schema updated: Added KeyValueExtractor and CombinedExtractor definitions, schema_filter to dynamic schema loader and stream retriever, create_or_update to component mapping, default_values to stream config, and support for multiple stream configs.
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py
airbyte_cdk/sources/declarative/extractors/combined_extractor.py
Added new extractor classes: KeyValueExtractor (combines keys and values from separate extractors) and CombinedExtractor (merges outputs from multiple extractors).
airbyte_cdk/sources/declarative/extractors/__init__.py Imported and exported KeyValueExtractor and CombinedExtractor in the extractors package.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Factory extended to create and map new extractor types, handle schema filters, create_or_update, default_values, and multiple stream configs.
airbyte_cdk/sources/declarative/resolvers/components_resolver.py Added create_or_update boolean field to ComponentMappingDefinition and ResolvedComponentMappingDefinition.
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py StreamConfig dataclass now supports default_values. ConfigComponentsResolver supports multiple stream configs, YAML parsing for values, and uses create_or_update for path creation.
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py Added optional schema_filter to DynamicSchemaLoader and applied it in schema extraction.
airbyte_cdk/sources/declarative/manifest_declarative_source.py Internal refactor to use the dynamic_streams property instead of direct method call for dynamic stream configs.

Sequence Diagram(s)

sequenceDiagram
  participant User
  participant ManifestDeclarativeSource
  participant ModelToComponentFactory
  participant KeyValueExtractor
  participant CombinedExtractor
  participant DynamicSchemaLoader
  participant ConfigComponentsResolver

  User->>ManifestDeclarativeSource: Request streams
  ManifestDeclarativeSource->>ManifestDeclarativeSource: Retrieve static stream configs
  ManifestDeclarativeSource->>ManifestDeclarativeSource: Retrieve dynamic_streams
  ManifestDeclarativeSource->>ModelToComponentFactory: Build extractors (KeyValueExtractor/CombinedExtractor)
  ModelToComponentFactory->>KeyValueExtractor: Instantiate with keys_extractor, values_extractor
  ModelToComponentFactory->>CombinedExtractor: Instantiate with list of extractors
  ManifestDeclarativeSource->>DynamicSchemaLoader: Load schema with optional schema_filter
  DynamicSchemaLoader->>DynamicSchemaLoader: Apply schema_filter to properties
  ManifestDeclarativeSource->>ConfigComponentsResolver: Resolve stream configs (single or multiple)
  ConfigComponentsResolver->>ConfigComponentsResolver: Enumerate and merge configs, apply default_values
  ConfigComponentsResolver->>ConfigComponentsResolver: Set values, parse YAML if needed, create paths if create_or_update
Loading

Would you like a more detailed breakdown of how the new extractors interact with each other or with other components, wdyt?

✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🧹 Nitpick comments (9)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)

34-42: Consider handling mismatched key-value counts.

The implementation is clean, but what happens if there are fewer values than keys in a chunk? The current implementation will create a dictionary with missing keys.

Would you consider adding handling for this case? Perhaps:

while True:
    chunk = list(islice(values, len(keys)))
    if not chunk:
        break
-   yield dict(zip(keys, chunk))
+   # Ensure all keys have values, even if there are fewer values than keys
+   result = {}
+   for i, key in enumerate(keys):
+       result[key] = chunk[i] if i < len(chunk) else None
+   yield result

Or alternatively, if it's preferable to keep only the keys that have values:

while True:
    chunk = list(islice(values, len(keys)))
    if not chunk:
        break
-   yield dict(zip(keys, chunk))
+   # Only include keys that have corresponding values
+   yield dict(zip(keys[:len(chunk)], chunk))

What do you think?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (5)

1744-1769: Normalize enum formatting and enrich descriptions?

The enum for KeyValueExtractor is defined as [ KeyValueExtractor ] (with extra whitespace) and the description and field descriptions are placeholders. Could we trim the whitespace ([KeyValueExtractor]) and provide more meaningful descriptions (e.g., explain that it extracts keys and values from records using the provided extractors)? wdyt?


1769-1791: Normalize enum formatting and enrich descriptions?

Similarly, the CombinedExtractor uses [ CombinedExtractor ] with extraneous spaces and has placeholder descriptions. Would you consider adjusting the enum to [CombinedExtractor] and expanding the description (e.g., “Combines multiple record extractors to merge results into a single list of records”)? wdyt?


2369-2375: Provide a meaningful description?

The schema_filter property currently has description: placeholder. Can we clarify its purpose (e.g., “Filters extracted schema properties based on a predicate before schema transformation”) to improve schema docs? wdyt?


3372-3374: Add examples for new extractor types?

We’ve extended the extractor in RecordSelector to include CombinedExtractor and KeyValueExtractor but there are no examples demonstrating their usage. Could we add sample entries under examples to illustrate these new extractor types? wdyt?


4122-4127: Add metadata and verify code support for multiple configs?

The stream_config property has no title or description, and now allows both a single StreamConfig and an array. Have we updated the resolver implementation to handle both formats, and could we add descriptive metadata for clarity? wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

1-4: Check copyright year

The copyright year is set to 2025, which is in the future. This is likely an oversight.

-# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
+# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

What do you think?

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)

1481-1482: Should this field really be Optional?

Declaring the property as Optional[bool] = False means None is still a valid value, yet the default suggests you only expect a boolean. Would switching to a plain bool = False, or at least documenting the meaning of None, make the intent clearer, wdyt?


2436-2438: schema_filter placeholder lacks guidance.

The new schema_filter option is great 👍. Could you add at least a one-line purpose sentence (e.g. “Filter dynamic schema records before type-inference is applied”) to help spec readers, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting

📥 Commits

Reviewing files that changed from the base of the PR and between fa8d54d and 50f729a.

📒 Files selected for processing (10)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (6 hunks)
  • airbyte_cdk/sources/declarative/extractors/__init__.py (2 hunks)
  • airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1 hunks)
  • airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1 hunks)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (1 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (7 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9 hunks)
  • airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2 hunks)
  • airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (6 hunks)
  • airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (4)
airbyte_cdk/sources/declarative/extractors/__init__.py (4)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
  • CombinedExtractor (19-44)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
  • CombinedExtractor (1858-1863)
  • DpathExtractor (696-709)
  • KeyValueExtractor (1847-1855)
airbyte_cdk/sources/declarative/extractors/http_selector.py (1)
  • HttpSelector (13-37)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
  • KeyValueExtractor (15-42)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (3)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
  • RecordExtractor (12-27)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • KeyValueExtractor (1847-1855)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
  • extract_records (37-44)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • StreamConfig (1485-1494)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (5)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (2)
  • ConfigComponentsResolver (42-171)
  • StreamConfig (25-37)
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
  • KeyValueExtractor (15-42)
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py (1)
  • DpathExtractor (18-86)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
  • CustomRecordExtractor (3745-3750)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
  • CombinedExtractor (19-44)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

[error] 182-182: Argument 1 to "filter_records" of "RecordFilter" has incompatible type "ItemsView[str, Any]"; expected "Iterable[Mapping[str, Any]]" [arg-type]


[error] 183-183: Invalid index type "int" for "Mapping[str, Any]"; expected type "str" [index]

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py

[error] 97-97: Function is missing a return type annotation [no-untyped-def]


[error] 98-98: Function is missing a type annotation [no-untyped-def]


[error] 103-103: Function is missing a type annotation [no-untyped-def]


[error] 106-106: Function is missing a return type annotation [no-untyped-def]


[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]


[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]


[error] 112-112: Unsupported left operand type for + ("object") [operator]


[error] 113-113: Need type annotation for "item" [var-annotated]


[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]


[error] 115-115: Function is missing a type annotation [no-untyped-def]


[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]


[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

[error] 1493-1493: Missing type parameters for generic type "List" [type-arg]

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

[error] 2476-2476: Argument 1 to "_create_component_from_model" of "ModelToComponentFactory" has incompatible type "RecordFilter | CustomRecordFilter | None"; expected "BaseModel" [arg-type]

⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (26)
airbyte_cdk/sources/declarative/extractors/__init__.py (2)

5-8: New extractors properly imported!

Both CombinedExtractor and KeyValueExtractor are properly imported. This makes sense as they complement the existing extractor functionality.


23-24: Exports properly added to all

The new extractor classes are correctly exposed in the __all__ list, ensuring they're accessible when the package is imported.

airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2)

25-25: New create_or_update flag added correctly

Adding the optional boolean flag with a default value of False to ComponentMappingDefinition is a backward-compatible change. This flag enables conditional creation or updating of component mappings, which enhances flexibility. The default value ensures existing code won't break.


38-38: Mirrored create_or_update flag in ResolvedComponentMappingDefinition

The same flag is consistently added to ResolvedComponentMappingDefinition with the same default value, maintaining consistency between the two related dataclasses.

airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)

37-44: Good implementation of extract_records method

The implementation correctly collects record iterables from all extractors, zips them together to align corresponding records, and merges them into single dictionaries to yield. This is an elegant way to combine records from multiple sources.

airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

265-265: Clean refactoring to use dynamic_streams property

This change simplifies the code by using the dynamic_streams property instead of directly calling _dynamic_stream_configs. It improves readability while maintaining the same behavior.

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (3)

13-13: Good addition of the RecordFilter import.

The new import aligns perfectly with the schema filtering capability you're adding.


130-130: Love the optional schema filtering feature.

This is a nice extension of the DynamicSchemaLoader functionality. The optional nature means existing code won't break while providing new filtering capabilities.


156-156: LGTM: Filtering before transformation.

Applying the filter before transformations is the right approach, as it reduces the work transformations need to do.

airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (2)

1-12: LGTM: Clean imports and structure.

The imports are clean and appropriate for this class. Using islice is a good choice for taking chunks of an iterator.


14-33: Nice declarative class with clear documentation.

The class definition and docstring are excellent. The example is particularly helpful for understanding the extractor's purpose.

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (6)

32-32: Adding default_values to StreamConfig makes sense.

This allows providing fallback configurations, which is a nice convenience.


53-53: Good change to support multiple stream configs.

Changing from a single stream_config to a list of stream_configs provides more flexibility.


87-87: Adding create_or_update property passes through important behavior.

Preserving this flag in the resolved component is important for proper handling later.


154-155: LGTM: Parsing YAML values is a nice enhancement.

This is a good improvement - converting string YAML to native Python types before setting makes configurations more flexible.


157-158: Good handling for create_or_update paths.

Using dpath.new when dpath.set fails and create_or_update is true provides a nice way to handle missing paths.


162-171: LGTM: Clean YAML parsing helper.

The static method for YAML parsing is well-implemented with proper error handling.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9)

86-86: New extractors added to imports - looks good!

The imports for the new CombinedExtractor and KeyValueExtractor along with their corresponding models have been correctly added to the file.

Also applies to: 88-88, 147-149, 312-314


652-654: Constructor mappings added for new extractors - LGTM!

You've properly registered the new extractors in the PYDANTIC_MODEL_TO_CONSTRUCTOR dictionary so they can be instantiated from the models.


2231-2246: KeyValueExtractor factory method implementation looks good

The implementation correctly creates a KeyValueExtractor by instantiating both the keys and values extractors from their models.


2247-2260: CombinedExtractor factory method implementation looks good

The implementation correctly creates extractors from the provided list and instantiates the CombinedExtractor with them.


2482-2482: New schema_filter parameter added to DynamicSchemaLoader initialization - LGTM

The schema_filter parameter is now correctly passed to the DynamicSchemaLoader constructor.


3644-3644: Added create_or_update flag to ComponentMappingDefinition

The new create_or_update parameter enhances flexibility when dealing with component mappings.


3691-3691: Added default_values to StreamConfig

The default_values parameter adds support for default stream values, which is a good addition.


3698-3707: Improved handling of multiple stream configs

The code now properly handles both single and multiple stream configs by normalizing to a list and creating multiple stream config components.


3721-3721: ConfigComponentsResolver now takes a list of stream_configs

This change allows the resolver to handle multiple stream configs, which provides more flexibility.

@lazebnyi
Copy link
Contributor Author

lazebnyi commented May 17, 2025

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@lazebnyi lazebnyi requested review from bnchrch and brianjlai May 17, 2025 02:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant