Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Low-Code CDK Property Chunking): Allow fetching query properties and property chunking for low-code sources #452

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented Mar 30, 2025

Closes https://github.com/airbytehq/airbyte-internal-issues/issues/12145

What

Adds support for the QueryProperties concept which allows for low-code connectors to inject either a static list of properties (source-linkedin-ads) or a set of properties retrieved from an API endpoint (source-hubspot) into the HttpRequester.request_properties

See the spec for more details about what this feature is and how it is used by connectors:
https://docs.google.com/document/d/1B_WcIEHTzctwHX_QjZgONXVF9g0i7BwpKzwrHMIA59s/edit?tab=t.0#heading=h.5dmjlrjauglv

How

This PR has all the usual things associated with new components being added to the low-code framework. That includes updating declarative_component_schema.yaml, regenerating pydantic models, and updating model_to_component_factory.py to parse the models into runtime components.

The part I want to call out around model parsing is that we do something of a unique (and maybe a bit confusing) transformation of the HttpRequester.request_parameters.query_properties model definition by moving it under the SimpleRetriever.additional_query_properties for runtime. The optimal developer experience is to define it under request_parameters because that is ultimately where it gets injected. However, the SimpleRetriever needs to have context into the QueryProperties component because it orchestrates the requests being made and merging records back together.

At runtime, PR can be broken down into the two pieces of functionality mentioned above

Defining or retrieving the complete set of request properties to inject:

The QueryProperties runtime class acts as the orchestrator behind getting the list of properties that need to be queried for either through the statically defined list or the PropertiesFromEndpoint class. It then leverages PropertyChunking if defined, otherwise it returns a single set of properties. This way, all QueryProperty output is processed in the same way by the SimpleRetriever regardless of whether property chunking is defined.

Some connectors like linkedin ads have a unique identifier based on property fields that must be requested and therefore they must be included in every grouping of properties. The always_include_properties field allows us to define fields we know must be added. And this allows us to avoid using interpolation to build a complex query field such as: fields: "{{ ','.join(['required', stream_partition.extra_fields['query_properties']) }}"

Performing chunking into smaller sets of properties for APIs with restrictions around requests:

The PropertyChunking runtime class is responsible for dictating when to break the complete set of components into smaller groups either by the character count or by the number of properties per group. It also supplies the record_merge_strategy which would only be needed when there are multiple groups

Loading the query_properties into the the outbound API request

This is probably one of the more awkward parts of the code. We need some mechanism of passing the current chunk of query_properties to inject from the SimpleRetriever.read_recordsto theInterpolatedRequestOptionsProvider. The most convenient way to avoid changing the interface is to use extra_properties` which won't be persisted back as state.

Even though jinja interpolation could work and might be more consistent so that every request_param is a string, it feels backwards to convert things back into interpolated strings when we can just operate on the StreamSlice directly

Summary by CodeRabbit

Summary by CodeRabbit

  • New Features

    • Introduced flexible record merging strategies that allow records to be grouped or emitted in parts.
    • Enabled dynamic property extraction with built-in chunking to accommodate API constraints.
    • Enhanced API request configuration by integrating additional query property options for more granular control.
  • Bug Fixes

    • Improved error handling for invalid stream slices in request options.
  • Tests

    • Expanded test coverage for SimpleRetriever, PropertiesFromEndpoint, PropertyChunking, and QueryProperties to ensure robust functionality and error handling.

@brianjlai brianjlai marked this pull request as ready for review April 2, 2025 01:56
Copy link
Contributor

coderabbitai bot commented Apr 2, 2025

📝 Walkthrough

Walkthrough

This PR introduces several new components and classes to improve record merge strategies and property handling within the declarative framework. Updates span YAML schema definitions, Python models, component factories, requesters, and retrievers. In particular, new strategies such as EmitPartialRecordMergeStrategy and GroupByKeyMergeStrategy are added, along with support for property chunking and dynamic retrieval from endpoints. The changes also include modifications to request options and extensive additions to the test suites to ensure accurate behavior for the new query properties functionality.

Changes

File(s) Change Summary
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Added new components: EmitPartialRecordMergeStrategy, GroupByKeyMergeStrategy, PropertiesFromEndpoint, PropertyChunking, and QueryProperties for advanced record merging and property management.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Introduced new classes for merge strategies and property handling, a new enum PropertyLimitType, and updated the HttpRequester.request_parameters type for added flexibility.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Added new factory methods to create the above components and extended the mapping to integrate the new models into the component creation process; modified create_simple_retriever to handle QueryProperties.
airbyte_cdk/sources/declarative/requesters/query_properties/** Created a set of modules and classes (including PropertiesFromEndpoint, PropertyChunking, QueryProperties, and strategies under strategies/) to manage dynamic property retrieval, chunking, and merging via various strategies (EmitPartialRecord and GroupByKey).
airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py Added an optional query_properties_key attribute and modified get_request_params to incorporate and validate query properties from stream slices.
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py Introduced an additional_query_properties attribute and updated the record reading flow to support property chunking and merging based on the new components.
unit_tests/** Expanded test suites across parsers, requesters, strategies, and retrievers to cover new functionality, including tests for query properties, property chunking, merge strategies, error handling in request options, and overall retriever behavior.

Sequence Diagram(s)

sequenceDiagram
    participant Caller as Data Requester
    participant SR as SimpleRetriever
    participant QP as QueryProperties
    participant PC as PropertyChunking
    participant MS as MergeStrategy

    Caller->>SR: Request records
    SR->>QP: get_request_property_chunks(stream_slice)
    alt Dynamic Properties
        QP->>SR: Process PropertiesFromEndpoint
    else Static List
        QP->>PC: Chunk properties (if enabled)
        PC-->>QP: Return property chunks
    end
    QP-->>SR: Return chunk(s)
    alt Multiple chunks found
        SR->>MS: Determine merge key (GroupByKey/EmitPartialRecord)
        MS-->>SR: Return merge key
        SR->>SR: Merge records based on key
    end
    SR->>Caller: Emit final record(s)
Loading

Possibly related PRs

Suggested labels

bug

Suggested reviewers

  • maxi297: Would you be able to review these changes? wdyt?
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai plan to trigger planning for file edits and PR creation.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (28)
unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py (1)

29-34: Would a non-existent key be clearer than an empty string for this test case?

The test is validating the behavior when a key isn't present in the record data, but using [""] (empty string) as the key might be confusing. Perhaps using a key that clearly doesn't exist in the record data (like ["non_existent_key"]) would make the test's intention more obvious? This would more explicitly test that the method returns None when a key isn't found in the record. wdyt?

airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py (1)

87-108: Good validation checks, but could the error messages be more developer-friendly?

The validation logic is thorough and correctly checks for the presence and type of query properties. I like how you're doing extensive validation before trying to use the properties.

One minor suggestion: all error messages include "Please contact Airbyte Support" which might not be necessary for developer-facing errors. Could these messages be more actionable for developers working with the codebase, wdyt?

-                        "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support"
+                        "stream_slice should not be None if query properties in requests is enabled"

And similar changes for the other error messages.

unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py (2)

15-58: Nicely structured parametric tests.
These scenarios comprehensively cover various property chunking constraints and inclusion behaviors. Would you consider adding an extra scenario with a single-element list to confirm the code’s behavior in edge cases, wdyt?


87-99: Check additional record fields in merge logic test?
Testing get_merge_key with a single key is good, but what about verifying how it behaves if the record has no "id" or has nested fields? Might be worth exploring more complex data. wdyt?

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (3)

94-94: Expose optional query properties.
Declaring additional_query_properties as optional is flexible for different user scenarios, which is great. Would you consider clarifying the docstring to detail the typical usage of this attribute, wdyt?


453-465: Conditional logic for additional_query_properties.
This check correctly distinguishes between regular single-chunk and multi-chunk scenarios. Have you considered logging or tracking when multiple property chunks are used for debugging or traceability, wdyt?


469-523: Merging records across property chunks.
The approach to accumulate partial records in merged_records is powerful. One potential improvement: do you want to handle collisions for fields with conflicting data (e.g., if two chunks have different values for the same key), wdyt?

airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)

13-18: Consider expanding docstring details.
The docstring is minimal. Would adding examples or clarifying usage scenarios help future maintainers, wdyt?

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (1)

14-20: Could you enhance the enum docstring?

The current docstring for PropertyLimitType is very minimal ("yeah"). Consider replacing it with a more descriptive explanation of what these limit types represent and how they affect property chunking, wdyt?

class PropertyLimitType(Enum):
    """
-   yeah
+   Defines the type of limit to apply when chunking properties.
+   
+   'characters': Limit based on the total number of characters in property names
+   'property_count': Limit based on the number of properties
    """

    characters = "characters"
    property_count = "property_count"
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (2)

1-2: Adding copyright notice is great! Small year correction needed.

I noticed the copyright year is set as 2025, which is likely a typo. Would it make sense to change this to 2023 or 2024 instead? wdyt?

-# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
+# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

28-43: Simple suggestion for the property yielding logic

The method implementation looks good! For line 42, could we simplify this slightly? The current yield from [list(fields)] is creating a list within a list, and then yielding from it. Would it be cleaner to just yield list(fields) directly? wdyt?

-            yield from [list(fields)]
+            yield list(fields)
unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py (3)

91-118: Well-implemented dynamic endpoint test

I like the use of Mock to test the dynamic endpoint case without actual network calls. One small suggestion - would it be valuable to also verify that get_properties_from_endpoint is called with the correct stream_slice parameter? This would ensure the proper context is passed through. wdyt?

# Add this assertion after line 113
properties_from_endpoint_mock.get_properties_from_endpoint.assert_called_once_with(stream_slice=stream_slice)

120-165: Good parameterized testing for has_multiple_chunks

The parameterized test for has_multiple_chunks provides good coverage. One small detail - both test cases have the same id "test_has_multiple_chunks" (lines 127 and 131). Would it be clearer to use more specific ids like "with_multiple_chunks" and "with_single_chunk" to differentiate them better in test reports? wdyt?

        pytest.param(
            5,
            True,
-           id="test_has_multiple_chunks",
+           id="with_multiple_chunks",
        ),
        pytest.param(
            10,
            False,
-           id="test_has_multiple_chunks",
+           id="with_single_chunk",
        ),

18-166: Consider adding tests for edge cases

The tests look comprehensive for the happy paths! Would it be valuable to add a couple of edge cases?

  1. A test for when property_list is None or an empty list
  2. A test for when property_chunking is None

These would ensure complete coverage of the code paths in the implementation. wdyt?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (5)

1047-1059: New Merge Strategy: EmitPartialRecordMergeStrategy
The definition for EmitPartialRecordMergeStrategy is clear and follows our schema pattern well. Have you thought about adding specific test cases to verify how it handles a record split across multiple API requests, especially given that it’s intended for records without a unique identifier? wdyt?


1768-1790: Clarification on GroupByKeyMergeStrategy Implementation
The new GroupByKeyMergeStrategy appears to support grouping records based on a key that can be either a single string or an array. Could you elaborate on how composite keys (arrays) will be processed during record merging? Also, have you added tests to ensure both single-field and multi-field keys behave as expected? wdyt?


3031-3060: Introducing PropertiesFromEndpoint
This block defines a way to dynamically fetch a list of properties via an API endpoint, which is a neat addition. Would you consider enhancing the documentation or examples to cover more complex cases (for instance, when additional parameters are involved)? Also, do you have tests planned to verify that the retriever integration works seamlessly? wdyt?


3061-3090: New Property Chunking Component
The PropertyChunking component clearly outlines how to limit the number of properties per request and the merging strategy. Would it be useful to include an example or note clarifying how different property_limit_type values (such as characters versus property_count) affect the chunking behavior? wdyt?


3091-3121: QueryProperties Component Addition
The QueryProperties component nicely unifies the concept of static property lists and dynamic retrieval via PropertiesFromEndpoint. How do you envision the interplay between always_include_properties and property_chunking in edge cases where API limitations are stringent? Perhaps a few more examples or tests could clarify this behavior. wdyt?

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)

350-353: Consider adding a docstring.
Would you like to add a short docstring explaining how EmitPartialRecordMergeStrategy is intended to merge partial records? wdyt?


1215-1235: Address minor grammar nit.
In the field description, "Dictates how to records..." might be missing a word. Maybe "Dictates how records requiring multiple requests..." wdyt?


2325-2337: Consider error handling for missing fields.
Would you like to handle the scenario where property_field_path is absent or doesn't exist in the response? wdyt?

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

4368-4384: Should the pytest.raises statement include the expected exception type?

The test is correctly checking that an invalid property limit type raises an exception, but the pytest.raises statement doesn't specify which exception type is expected. Should this be with pytest.raises(ValueError): or another specific exception type, wdyt?

-    with pytest.raises:
+    with pytest.raises(ValueError):
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)

808-813: New create_emit_partial_record method
The method is straightforward. Would you consider adding a small docstring explaining usage? wdyt?


2142-2145: New create_group_by_key method
Implementation looks okay. Do you want to check if model.key is non-empty or handle unexpected values? wdyt?


2151-2151: Adding optional query_properties_key param
This extra parameter is helpful for hooking up query properties. Perhaps document its purpose in a docstring? wdyt?


2886-2920: Handling QueryProperties in request_parameters
The logic to accept only one QueryProperties is clear. Would you like to refactor this into a helper method for readability or plan for multiple definitions later? wdyt?


3041-3051: New _remove_query_properties helper
Implementation is straightforward. Would you consider logging or handling the case of multiple matching parameters? wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between caa1e7d and abbc2a1.

📒 Files selected for processing (21)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (4 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (7 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (14 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py (3 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3 hunks)
  • unit_tests/sources/declarative/requesters/query_properties/__init__.py (1 hunks)
  • unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py (1 hunks)
  • unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py (1 hunks)
  • unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py (1 hunks)
  • unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py (1 hunks)
  • unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py (2 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (4 hunks)
🧰 Additional context used
🧬 Code Definitions (14)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py (3)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)
  • EmitPartialRecord (13-23)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
  • GroupByKey (13-33)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (1)
  • RecordMergeStrategy (11-19)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (3)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (2)
  • RecordMergeStrategy (11-19)
  • get_group_key (18-19)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (16)
  • Config (132-133)
  • Config (146-147)
  • Config (160-161)
  • Config (174-175)
  • Config (192-193)
  • Config (206-207)
  • Config (220-221)
  • Config (234-235)
  • Config (248-249)
  • Config (262-263)
  • Config (276-277)
  • Config (290-291)
  • Config (306-307)
  • Config (320-321)
  • Config (334-335)
  • Config (373-374)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
  • get_group_key (25-33)
airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py (4)
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
  • PropertiesFromEndpoint (14-40)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
  • PropertiesFromEndpoint (2325-2336)
  • PropertyChunking (1215-1234)
  • QueryProperties (2339-2356)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (1)
  • PropertyChunking (24-68)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1)
  • QueryProperties (14-51)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)
  • get_group_key (22-23)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
  • get_group_key (25-33)
unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py (1)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (4)
  • PropertyChunking (24-68)
  • PropertyLimitType (14-20)
  • get_request_property_chunks (41-65)
  • get_merge_key (67-68)
unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py (4)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (2)
  • GroupByKey (13-33)
  • get_group_key (25-33)
unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py (1)
  • test_get_merge_key (87-98)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)
  • get_group_key (22-23)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (1)
  • get_group_key (18-19)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (4)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
  • PropertyChunking (24-68)
  • PropertyLimitType (14-20)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
  • GroupByKey (13-33)
airbyte_cdk/sources/types.py (4)
  • Record (20-63)
  • associated_slice (38-39)
  • StreamSlice (66-160)
  • data (34-35)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (4)
  • SimpleRetriever (54-643)
  • name (106-114)
  • name (117-119)
  • read_records (440-559)
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
  • read_records (440-559)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (3)
  • QueryProperties (14-51)
  • get_request_property_chunks (28-42)
  • has_multiple_chunks (44-51)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
  • get_request_property_chunks (41-65)
  • get_merge_key (67-68)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (2)
  • PropertiesFromEndpoint (14-40)
  • get_properties_from_endpoint (30-40)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
  • PropertyChunking (24-68)
  • get_request_property_chunks (41-65)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (4)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (2)
  • GroupByKey (13-33)
  • get_group_key (25-33)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (2)
  • RecordMergeStrategy (11-19)
  • get_group_key (18-19)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (16)
  • Config (132-133)
  • Config (146-147)
  • Config (160-161)
  • Config (174-175)
  • Config (192-193)
  • Config (206-207)
  • Config (220-221)
  • Config (234-235)
  • Config (248-249)
  • Config (262-263)
  • Config (276-277)
  • Config (290-291)
  • Config (306-307)
  • Config (320-321)
  • Config (334-335)
  • Config (373-374)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1)
  • get_request_property_chunks (28-42)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
  • PropertyLimitType (14-20)
  • PropertyChunking (24-68)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
  • SimpleRetriever (54-643)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4)
  • PropertyChunking (1215-1234)
  • PropertiesFromEndpoint (2325-2336)
  • QueryProperties (2339-2356)
  • PropertyLimitType (1210-1212)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
  • PropertyChunking (24-68)
  • PropertyLimitType (14-20)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
  • GroupByKey (13-33)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (6)
  • EmitPartialRecordMergeStrategy (350-352)
  • GroupByKeyMergeStrategy (728-736)
  • PropertiesFromEndpoint (2325-2336)
  • PropertyChunking (1215-1234)
  • PropertyLimitType (1210-1212)
  • QueryProperties (2339-2356)
🔇 Additional comments (57)
unit_tests/sources/declarative/requesters/query_properties/__init__.py (1)

1-1: LGTM! The copyright notice is correctly in place.

Adding just a copyright notice without any imports is typical for an empty __init__.py file to mark a directory as a Python package. The AI summary mentions imports that aren't visible in the provided code, but this might be part of future changes.

airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py (1)

1-13: LGTM! The imports and exports are correctly structured.

The file properly imports the required strategy classes and exports them via __all__, following Python best practices for package organization. This provides a clean interface for importing these strategies from other modules.

airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py (1)

1-13: LGTM! The imports and exports are correctly structured.

The file properly imports the required classes (PropertiesFromEndpoint, PropertyChunking, and QueryProperties) and exports them via __all__, which follows Python best practices for package organization. This provides a clean interface for importing these components from other modules.

unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py (2)

9-36: LGTM! Well-structured parameterized tests.

The test cases cover different scenarios for the get_group_key method, including single keys, multiple keys, and missing keys. The parameterization makes the tests concise and maintainable.


37-41: LGTM! The test implementation is clear and concise.

The test creates a GroupByKey instance with the provided key, calls the get_group_key method with the provided record, and verifies the result matches the expected value.

airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (1)

1-20: Well-designed abstract base class for record merging strategies!

This is a nice, clean implementation of the RecordMergeStrategy abstract base class that establishes a clear interface for how records requiring multiple requests should be merged. The abstract get_group_key method provides a clear contract for subclasses to implement.

The design allows for different merging strategies like EmitPartialRecord and GroupByKey to implement their own grouping logic while sharing a common interface. This will make it easier to extend with additional strategies in the future.

airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py (1)

43-43: Clear addition of query_properties_key with proper type annotation.

Good addition of this optional field to support query properties in request options. The type annotation is appropriate here.

airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (2)

1-24: Clean implementation with good initialization logic.

The GroupByKey class has a nice implementation with appropriate type hints and initialization logic. The __post_init__ method correctly handles both string and list inputs for the key parameter, ensuring consistent internal representation.


25-33: Good implementation of get_group_key, but what about array values?

The method correctly handles missing keys by returning None when any key is missing. This ensures records don't get incorrectly grouped.

One question: how should this handle cases where key_value might be an array/list itself? Currently, it would just append the array as-is to resolved_keys, which might lead to unexpected behavior when joining. Should there be specific handling for this case or is the current behavior intentional, wdyt?

airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)

1-24: Simple and effective implementation for emitting partial records.

This is a straightforward implementation of RecordMergeStrategy that always returns None from get_group_key, indicating that records should not be grouped. The class name and docstring clearly communicate its purpose of emitting partial records without merging.

This strategy will be useful in scenarios where records don't have suitable primary keys for merging.

unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py (1)

60-86: Clear and concise test for get_request_property_chunks.
Everything looks great here. The iteration and assertions align well with chunked outputs. No immediate concerns.

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)

6-6: Good use of defaultdict for record merging.
Pulling in defaultdict is a neat way to handle the multi-chunk merging logic. No objections here.


533-559: Handling Full Refresh for chunk-less scenario.
This logic effectively checks the FULL_REFRESH_SYNC_COMPLETE_KEY. Everything looks well integrated. No issues spotted.

airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)

19-28: Check if multiple property_field_path levels are needed.
You handle a list of path segments, which is good. If the path is deeply nested, do you want to confirm that each segment evaluation works properly, wdyt?

unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py (2)

184-213: Good job on thorough test coverage for query properties validation!

I like how you've covered a comprehensive set of test cases using parameterization, testing both valid cases and various error scenarios. This will ensure robust validation of query properties in stream slices.


214-232: Clear test implementation with appropriate assertions.

The test function implementation is well-structured with clear conditional logic for handling both error and success cases. The assertions properly validate the expected behavior.

unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py (3)

12-54: Well-designed test for basic property extraction!

Good job creating this test with a comprehensive list of expected properties and clear verification. The mock setup is clean and the assertions validate both the count and content of the properties.


57-99: Great test for handling nested properties!

This test nicely validates the ability to extract properties from nested structures using multiple field paths. The test data is well-structured and the assertions are appropriate.


102-145: Excellent test for interpolation functionality!

I appreciate how you've tested the dynamic field path interpolation using a config parameter. This ensures the flexibility of the implementation for real-world usage where paths may be configuration-driven.

unit_tests/sources/declarative/retrievers/test_simple_retriever.py (5)

33-40: Clean imports for the new query properties components.

The imports are well-organized and focused on the new functionality being tested.


1003-1003: Minor name change in test data.

Simple update from "erobertric" to "robert" in the test data. This looks like a straightforward correction or preference change.


1105-1266: Comprehensive test for property chunking across multiple requests!

This test thoroughly validates the core functionality of retrieving and merging records across multiple property chunks. The test data is well-structured with a clear expected outcome, and the test covers the complete workflow from request to record merging.


1269-1443: Good boundary case test for single chunk processing!

I like how you're testing the boundary condition where all properties fit within a single chunk (property_limit of 10). This ensures the chunking logic works correctly even when chunking isn't strictly needed.


1446-1602: Important test for graceful handling of missing merge keys!

This test ensures that records are still emitted even when a valid merge key isn't available (using "not_real" as the key). This is crucial for fault tolerance in real-world scenarios where data might not always match expectations.

airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (3)

23-40: Good class design with sensible defaults!

I like how you've designed the PropertyChunking class with clear attributes and a sensible default for the merge strategy. The class docstring effectively explains the purpose of this component.


41-65: Well-implemented chunking method with comprehensive edge cases!

The get_request_property_chunks method is well-implemented with handling for various scenarios:

  • When no property limit is set
  • Respecting the always_include_properties in each chunk
  • Different sizing based on the limit type

The logic for determining when to create a new chunk is clean and easy to follow.


67-68: Concise delegation to the merge strategy.

The get_merge_key method cleanly delegates to the underlying merge strategy, maintaining good separation of concerns.

airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (2)

13-27: This component design looks well-thought out!

The class structure with support for both static property lists and dynamic retrieval is elegant. I particularly like how you've made both the always_include_properties and property_chunking optional, giving implementers flexibility based on their specific API requirements.


44-51: The implementation for checking multiple chunks is clever

Nice approach to checking for multiple chunks by trying to get two chunks from the iterator! This is a clean way to determine if chunking will actually result in multiple API calls without having to materialize all chunks.

unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py (3)

1-17: Good test setup with proper imports

The imports and test setup look good! I like that you're properly importing the necessary components and mocks for testing.


21-54: Great test for chunking with static property list

The test is well-structured and validates the correct behavior of property chunking with a static list. I like the clear assertions that verify both the number of chunks and their contents.


56-89: Thorough testing of always_include_properties functionality

Good job testing the always_include_properties feature! The assertions clearly verify that "zero" is included at the beginning of each chunk, which is exactly what we'd expect.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)

728-737: Validate non-empty key?
Would you consider ensuring that the key field is not empty or None, possibly by adding a Pydantic validator? wdyt?


1210-1213: Looks correct.
These enum entries appear aligned with the intended usage.


2339-2357: Check for duplication with always-include properties.
If property_chunking is used along with always_include_properties, do you want logic to avoid duplicated properties in the final list? wdyt?

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (7)

75-75: LGTM! Added import for the new PropertyChunking model.

This import is needed for the new test cases that verify property chunking functionality.


128-137: LGTM! Added imports for query properties components.

These imports are necessary for the new test cases that verify the QueryProperties, PropertiesFromEndpoint, PropertyChunking, PropertyLimitType, and GroupByKey components.


4012-4107: LGTM! Well-structured test for SimpleRetriever with QueryProperties.

This test validates that:

  1. The QueryProperties object is correctly created with property list and always_include_properties
  2. PropertyChunking is configured correctly with property_count limit type
  3. GroupByKey merge strategy is set up correctly
  4. The request_options_provider properly handles the QueryProperties

The test aligns with the PR objective of supporting static property lists like in source-linkedin-ads.


4108-4203: LGTM! Good test coverage for SimpleRetriever with PropertiesFromEndpoint.

This test ensures that properties can be dynamically fetched from an endpoint, aligning with the PR objective of supporting the pattern used in source-hubspot. The test validates:

  1. Properties can be fetched from an endpoint
  2. The nested SimpleRetriever is properly configured
  3. Property chunking is correctly set up

4204-4261: LGTM! Good validation test for incorrect query property types.

This test verifies that an error is raised when something that is not a QueryProperties object is used in the request_parameters. Using a ListPartitionRouter as the test case provides good coverage for type safety.


4263-4346: LGTM! Proper error handling for multiple QueryProperties.

This test ensures that an error is raised when multiple QueryProperties objects are provided in the request_parameters, preventing potential misconfigurations.


4348-4366: LGTM! Good test for character-based property chunking.

This test verifies that PropertyChunking can be created with the "characters" limit type, which is useful for APIs that have character-length limitations on property lists.

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (15)

2-2: Check copyright year.
This updated year seems correct for 2025. Would you consider verifying it's in sync with your project’s timeline? wdyt?


228-230: New import for EmitPartialRecordMergeStrategyModel
The import aligns well with its usage in the constructor map below. wdyt?


240-242: New import for GroupByKeyMergeStrategyModel
This matches the new create_group_by_key function. Looks good. wdyt?


333-335: Imported PropertiesFromEndpointModel
Including this import is consistent with its usage in create_properties_from_endpoint. wdyt?


336-338: Imported PropertyChunkingModel
No issues; it’s properly referenced in create_property_chunking. wdyt?


339-341: Imported PropertyLimitTypeModel
This addition is consistent with your property limit logic. wdyt?


342-344: Imported QueryPropertiesModel
No concerns here; it supports the new query properties feature. wdyt?


453-457: Runtime class imports for QueryProperties
Bringing these in from query_properties complements the pydantic model imports. wdyt?


458-460: Imported PropertyLimitType
Consistently follows your property handling logic. wdyt?


461-464: Imported EmitPartialRecord and GroupByKey
No issues with these additions. wdyt?


626-626: Mapping EmitPartialRecordMergeStrategyModel
This entry ensures the factory can build the correct merge strategy. wdyt?


630-630: Mapping GroupByKeyMergeStrategyModel
This binds the new create_group_by_key method. All good. wdyt?


2184-2184: Passing query_properties_key to the request options
Matches the newly introduced parameter and looks fine. wdyt?


2925-2931: Creating the requester with query_properties_key
Ties into the new parameter seamlessly. wdyt?


3037-3037: Supplying additional_query_properties to SimpleRetriever
This references the property set for query chunking. Looks consistent. wdyt?

@brianjlai brianjlai changed the title [Low-Code CDK Property Chunking] Allow fetching query properties and property chunking for low-code sources feat(Low-Code CDK Property Chunking): Allow fetching query properties and property chunking for low-code sources Apr 2, 2025
@brianjlai brianjlai changed the title feat(Low-Code CDK Property Chunking): Allow fetching query properties and property chunking for low-code sources ✨ feat(Low-Code CDK Property Chunking): Allow fetching query properties and property chunking for low-code sources Apr 2, 2025
@brianjlai brianjlai changed the title ✨ feat(Low-Code CDK Property Chunking): Allow fetching query properties and property chunking for low-code sources feat(Low-Code CDK Property Chunking): Allow fetching query properties and property chunking for low-code sources Apr 2, 2025
f"Each element of request_parameters should be of type str or QueryProperties, but received {request_parameter.get('type')}"
)

if len(query_properties_definitions) > 1:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplicity its easier to just only ever worry about a single QueryProperties component defined per-requester. I don't currently see a use case. Unfortunately, we also can't rely on a auto-validations because as far as i know json schema can't enforce this granular a check of a specify elements type multiple times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we improve the error message to describe what the alternative should be?

Also, this might be far fetched but isn't it a case we implicitly support if PropertiesFromEndpoint returns only one property? Is this a problem?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we improve the error message to describe what the alternative should be?

I'm not sure we have an alternative? But I also don't currently see a use case for needing to pull properties from two endpoints, load them into two different query parameters, and needing to perform property chunking on both. And if that's a thing, I'm okay waiting until we see this to address it.

Also, this might be far fetched but isn't it a case we implicitly support if PropertiesFromEndpoint returns only one property? Is this a problem?

This should already be supported if PropertiesFromEndpoint returns only has one property. This restriction to len(query_properties_definitions) > 1 refers to specifically if there is more than 1 definition of a field to the set of properties to inject

So this would invalid:

request_parameters:
  request_field_1:
    type: QueryProperties
    ...
  request_field_2:
    type: QueryProperties

But if there was just request_field_1 that returned only one property in the set then that would work fine. Unless I just misunderstood your question

query_properties_key = key
query_properties_definitions.append(request_parameter)
elif not isinstance(request_parameter, str):
raise ValueError(
Copy link
Contributor Author

@brianjlai brianjlai Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't get json schema to pydantic model conversions to enforce str, QueryProperties. It always would get rendered as Any regardless of what combo I made once we included both a #ref and concrete type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add this comment to the code directly because I would be surprised to see this validation and I would probably search for the explanation

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just need a bit more information and I'll review this again. I can definitely say is that this YAML interface is clean and I really like it

@@ -1044,6 +1044,18 @@ definitions:
$parameters:
type: object
additionalProperties: true
EmitPartialRecordMergeStrategy:
title: Emit Partial Record
description: Record merge strategy where in the case where multiple requests are needed to retrieve all properties, properties are not consolidated back into a single record and are instead emitted as separate groups of properties. This strategy should only be used when records do not have a unique identifier like a primary key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we checked the streams that use this strategies in HubSpot? Checking real quick on CompaniesPropertyHistory, it seems like there is a primary key (see image) but I remember us saying that this strategy was used.

image

Therefore it would either seem like:

  • I'm wrong and CompaniesPropertyHistory does not use this strategy
  • It is not only a question of having a PK or not and we should improve this description to clarify that
  • There is a bug in source-hubspot

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we had talked about this before, but it wasn't just about the primary key, but whether denormalize_records was True and if it was, then we do not perform record merging. Only the property history streams change this value. See https://github.com/airbytehq/airbyte/blob/01c9c1a76bd4c4330e18185e789421ccdbd3090f/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py#L2018

Ultimately I don't think I fully know the context of why we denormalize records for these streams. So I would be okay taking out this merge strategy and waiting until we get to these streams on Hubspot and adding it back if you don't want us to support this yet

query_properties_key = key
query_properties_definitions.append(request_parameter)
elif not isinstance(request_parameter, str):
raise ValueError(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add this comment to the code directly because I would be surprised to see this validation and I would probably search for the explanation

f"Each element of request_parameters should be of type str or QueryProperties, but received {request_parameter.get('type')}"
)

if len(query_properties_definitions) > 1:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we improve the error message to describe what the alternative should be?

Also, this might be far fetched but isn't it a case we implicitly support if PropertiesFromEndpoint returns only one property? Is this a problem?

@dataclass
class PropertiesFromEndpoint:
"""
tbd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbd

]

def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]:
response_properties = self.retriever.read_records(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we force caching on this? Else, it feels like we will perform the HTTP request the retrieve properties every time we do a request for retrieving the records.

While we are at it, will we do this request somewhere else for schema discovery purposes? Should we make sure that use_cache and the name is the same to ensure that the cache is shared between both?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It shouldn't be every time. We get the set of properties once at the beginning when we process a slice. But it would be the case that we perform this fetching for every slice. It still makes sense to look into caching this request.

This might be ignorance on my part, but how do we enable caching? Is it through the model HttpRequester.use_cache field? I see how it enables the sqllite cache on the HttpClient, but I as far as I can tell the default is off, and we have manifests that enabled it.

Are you saying that we should be marking use_cache=True under the hood for both dynamic schema discovery and for property retrieval? Or am I completely off base and there's something I can do to turn it on annotations or new Python code


class PropertyLimitType(Enum):
"""
yeah
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah

parameters: InitVar[Mapping[str, Any]]

def get_request_property_chunks(
self, stream_slice: Optional[StreamSlice] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case does the list of properties depends on the stream_slice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for consistency so that the SimpleRetriever/HttpRequester, etc that is defined to fetch properties from the API endpoint have access to the stream slice object of the current partition. Hubspot specifically doesn't need access to the slice.

If we don't supply this, then if a future connector were to attempt to use jinja interpolation like we do for query params, this would get resolved to None. So even though we don't current have a use case, I would advocate that we keep this since it doesn't hurt for it to exist

else:
yield from [list(fields)]

def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It feels like we could let that to the consumer of the query properties (in our case, SimpleRetriever) we don't need to maintain this new method. Do we know in which other case we would need this method? I'm mostly worried about maintaining multiple methods that might generate multiple same HTTP requests multiple times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's fair and it does give the impression this is a "cheap" method, but in reality makes an API call. The main reason i ended up adding it was that we needed to introspect the iterable to detect if there were at least two elements. And we've been burned in the past with tee() not deallocating slices properly.

But I think we can move this method up into the retriever and if we combine with caching get_request_property_chunks() that should make things less require fewer outbound API calls. Thanks for the suggestion

@github-actions github-actions bot added the enhancement New feature or request label Apr 2, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (8)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)

1-1: Copyright year is set to 2025

I noticed the copyright year is set to 2025, which seems to be in the future. Should this be updated to the current year instead? wdyt?

- # Copyright (c) 2025 Airbyte, Inc., all rights reserved.
+ # Copyright (c) 2023 Airbyte, Inc., all rights reserved.

52-66: Consider handling case when a single property exceeds the limit

What happens if a single property's character length exceeds the property_limit? Currently, it would still be added to a chunk, which might exceed the API's limit. Should there be special handling for this case? Perhaps a warning log or raising an exception? wdyt?

            property_field_size = (
                len(property_field)
                if self.property_limit_type == PropertyLimitType.characters
                else 1
            )
+           # Handle case where a single property exceeds the limit
+           if property_field_size > self.property_limit and len(current_chunk) == (len(always_include_properties) if always_include_properties else 0):
+               logger.warning(f"Property '{property_field}' with size {property_field_size} exceeds the limit of {self.property_limit}")
            if chunk_size + property_field_size > self.property_limit:
                yield current_chunk
                current_chunk = list(always_include_properties) if always_include_properties else []
                chunk_size = 0
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)

25-30: Consider validating an empty property_field_path.

If property_field_path is empty, would it cause unexpected behavior elsewhere? Maybe raise a warning or handle that scenario, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)

2-2: Would you consider referencing this new year in docstrings too?
I see the project year updated here. If this is the only file referencing the year, that might be slightly inconsistent with other placeholders. wdyt?


808-813: Implementation of create_emit_partial_record is concise.
Would you consider adding a short docstring or minimal validation (e.g., if config is empty)? wdyt?


2142-2145: create_group_by_key also looks straightforward.
Should we add a quick assertion if the provided key is empty? wdyt?


2886-2927: Potential concern with in-place modification of request_parameters.
Might it introduce side effects for subsequent usage? Would a shallow copy be safer? wdyt?


3044-3054: _remove_query_properties logic is clean but might need a small guard check.
For example, verifying 'type' is present before comparing to "QueryProperties". wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between abbc2a1 and 63062d9.

📒 Files selected for processing (5)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (14 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (1 hunks)
  • airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1 hunks)
  • unit_tests/sources/declarative/test_manifest_declarative_source.py (0 hunks)
💤 Files with no reviewable changes (1)
  • unit_tests/sources/declarative/test_manifest_declarative_source.py
🚧 Files skipped from review as they are similar to previous changes (1)
  • airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
🧰 Additional context used
🧬 Code Definitions (3)
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
  • read_records (440-559)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (3)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (2)
  • GroupByKey (13-33)
  • get_group_key (25-33)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py (2)
  • RecordMergeStrategy (11-19)
  • get_group_key (18-19)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (18)
  • Config (132-133)
  • Config (146-147)
  • Config (160-161)
  • Config (174-175)
  • Config (192-193)
  • Config (206-207)
  • Config (220-221)
  • Config (234-235)
  • Config (248-249)
  • Config (262-263)
  • Config (276-277)
  • Config (290-291)
  • Config (306-307)
  • Config (320-321)
  • Config (334-335)
  • Config (373-374)
  • PropertyLimitType (1210-1212)
  • PropertyChunking (1215-1234)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (22)
  • EmitPartialRecordMergeStrategy (350-352)
  • GroupByKeyMergeStrategy (728-736)
  • PropertiesFromEndpoint (2325-2336)
  • PropertyChunking (1215-1234)
  • PropertyLimitType (1210-1212)
  • QueryProperties (2339-2356)
  • Config (132-133)
  • Config (146-147)
  • Config (160-161)
  • Config (174-175)
  • Config (192-193)
  • Config (206-207)
  • Config (220-221)
  • Config (234-235)
  • Config (248-249)
  • Config (262-263)
  • Config (276-277)
  • Config (290-291)
  • Config (306-307)
  • Config (320-321)
  • Config (334-335)
  • Config (373-374)
airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (1)
  • PropertiesFromEndpoint (14-40)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
  • PropertyChunking (25-69)
  • PropertyLimitType (14-21)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1)
  • QueryProperties (14-51)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/emit_partial_record.py (1)
  • EmitPartialRecord (13-23)
airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py (1)
  • GroupByKey (13-33)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (19)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (5)

14-22: LGTM! Well-defined enum for property limit types

The PropertyLimitType enum clearly defines two approaches for chunking properties - by character count or by property count. The docstring explains the purpose well.


24-36: Well-structured PropertyChunking class

The PropertyChunking class is well-defined with clear parameters and types. I appreciate the detailed docstring explaining its purpose.


37-40: Default merge strategy is sensible

Using GroupByKey with "id" as the default makes sense as a reasonable fallback when no specific merge strategy is provided.


68-69: LGTM! Clean delegation to the merge strategy

This method cleanly delegates to the underlying merge strategy, maintaining good separation of concerns.


42-51:

❓ Verification inconclusive

Consider handling edge case with empty property lists

The method correctly handles the case when no property limit is set, but what about when both property_fields and always_include_properties are empty? It would yield an empty list. Is this the expected behavior or should there be a validation check? wdyt?


🏁 Script executed:

#!/bin/bash
# Check if there are any tests that verify the behavior with empty lists
rg -A 10 -B 5 "def test.*empty.*property" --glob "test_*.py"
# Or any tests for the PropertyChunking class
rg -A 10 "class Test.*PropertyChunking" --glob "test_*.py" 

Length of output: 123


Edge Case Handling in get_request_property_chunks

It looks like when both property_fields and always_include_properties are empty, the method yields an empty list without any explicit validation. Since our search didn't surface any tests or existing documentation addressing this scenario, could you please confirm whether returning an empty list is indeed the expected behavior or if we should introduce an additional validation check? wdyt?

airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py (7)

1-2: License header looks good.

No concerns here.


3-5: Imports from dataclasses and typing look standard.

No issues.


6-7: Confirm pinned dpath version?

We’re importing dpath here. Sometimes pinned versions are required to avoid unexpected behavior. Would you consider verifying or pinning the version in your dependencies, wdyt?


8-11: Imports from internal modules look appropriate.

No immediate concerns.


13-19: Docstring is clear and comprehensive.

This docstring nicely explains the usage and intent of dynamically retrieving properties.


20-24: All fields seem well-defined.

Everything is properly typed and complements the docstring description.


31-41: Potential caching and error handling enhancements for get_properties_from_endpoint.

  1. If this method is repeatedly fetching properties for each slice, would you consider enabling caching at the Retriever level, wdyt?
  2. Also, referencing , a previous reviewer suggested ensuring robust error handling / logging when dpath.get path is missing, especially if that indicates unexpected data from the API. Right now, a default [] is returned, but do we need to raise or log a warning in that scenario, wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (7)

228-230: No issues spotted in the new import.
It appears neatly added for the EmitPartialRecordMergeStrategyModel. wdyt?


240-242: The new import for GroupByKey seems coherent.
Seems like you’ve aligned with the naming from the rest of the code. wdyt?


333-335: Importing PropertiesFromEndpointModel looks straightforward.
Everything appears in order. wdyt?


337-339: Adding PropertyChunkingModel also looks good.
No concerns, wdyt?


340-341: Importing PropertyLimitTypeModel is consistent.
No further issues here, wdyt?


342-344: Introducing QueryPropertiesModel.
This import is well-aligned with the rest of your new property management logic. wdyt?


453-464: All new imports from query_properties seem to align with your approach for property handling.
Would you consider grouping them on fewer lines, or do you prefer the brevity here? wdyt?

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (8)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (5)

2-2: Update copyright year to 2023 or 2024?

The copyright year is set to 2025, which appears to be in the future. Should this be updated to the current year instead?

-# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
+# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

453-463: Consider extracting property chunk initialization logic

The initialization logic for property chunks could be extracted to a separate method for better readability. This would also simplify the read_records method which is becoming quite complex.

 def read_records(
     self,
     records_schema: Mapping[str, Any],
     stream_slice: Optional[StreamSlice] = None,
 ) -> Iterable[StreamData]:
     """
     Fetch a stream's records from an HTTP API source

     :param records_schema: json schema to describe record
     :param stream_slice: The stream slice to read data for
     :return: The records read from the API source
     """

+    property_chunks, has_multiple_chunks = self._initialize_property_chunks(stream_slice)
+    merged_records: MutableMapping[str, Any] = defaultdict(dict)
+    _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
     most_recent_record_from_slice = None

+    # Rest of the method...
+
+def _initialize_property_chunks(self, stream_slice: Optional[StreamSlice]) -> Tuple[List[List[str]], bool]:
     if self.additional_query_properties:
         property_chunks = list(
             self.additional_query_properties.get_request_property_chunks(
                 stream_slice=stream_slice
             )
         )
         has_multiple_chunks = self._has_multiple_chunks(stream_slice=stream_slice)
     else:
         property_chunks = [[""]]
         has_multiple_chunks = False
-    merged_records: MutableMapping[str, Any] = defaultdict(dict)
-    _slice = stream_slice or StreamSlice(partition={}, cursor_slice={})  # None-check
+    return property_chunks, has_multiple_chunks

464-512: Consider splitting property chunking logic into a separate method

The handling of property chunks and record merging makes the method quite complex. Consider extracting this logic into a separate method for better readability and maintainability. This would also make unit testing easier.

 if self.additional_query_properties:
-    for properties in property_chunks:
-        _slice = StreamSlice(
-            partition=_slice.partition or {},
-            cursor_slice=_slice.cursor_slice or {},
-            extra_fields={"query_properties": properties},
-        )  # None-check
-
-        record_generator = partial(
-            self._parse_records,
-            stream_slice=_slice,
-            stream_state=self.state or {},
-            records_schema=records_schema,
-        )
-
-        for stream_data in self._read_pages(record_generator, self.state, _slice):
-            current_record = self._extract_record(stream_data, _slice)
-            if self.cursor and current_record:
-                self.cursor.observe(_slice, current_record)
-
-            # Latest record read, not necessarily within slice boundaries.
-            # TODO Remove once all custom components implement `observe` method.
-            # https://github.com/airbytehq/airbyte-internal-issues/issues/6955
-            most_recent_record_from_slice = self._get_most_recent_record(
-                most_recent_record_from_slice, current_record, _slice
-            )
-
-            # Record merging should only be done if there are multiple property chunks. Otherwise,
-            # yielding immediately is more efficient so records can be emitted immediately
-            if (
-                has_multiple_chunks
-                and self.additional_query_properties.property_chunking
-                and current_record
-            ):
-                merge_key = (
-                    self.additional_query_properties.property_chunking.get_merge_key(
-                        current_record
-                    )
-                )
-                if merge_key:
-                    merged_records[merge_key].update(current_record)
-                else:
-                    yield stream_data
-            else:
-                yield stream_data
+    most_recent_record_from_slice, merged_records = self._process_property_chunks(
+        property_chunks, 
+        has_multiple_chunks, 
+        _slice, 
+        most_recent_record_from_slice, 
+        merged_records,
+        records_schema
+    )
     if self.cursor:
         self.cursor.close_slice(_slice, most_recent_record_from_slice)

Then add a new method:

def _process_property_chunks(
    self,
    property_chunks: List[List[str]],
    has_multiple_chunks: bool,
    base_slice: StreamSlice,
    most_recent_record: Optional[Record],
    merged_records: MutableMapping[str, Any],
    records_schema: Mapping[str, Any]
) -> Tuple[Optional[Record], MutableMapping[str, Any]]:
    """
    Process each property chunk and handle record merging if needed.
    
    Returns:
        Tuple containing the most recent record and the merged records dictionary
    """
    for properties in property_chunks:
        _slice = StreamSlice(
            partition=base_slice.partition or {},
            cursor_slice=base_slice.cursor_slice or {},
            extra_fields={"query_properties": properties},
        )

        record_generator = partial(
            self._parse_records,
            stream_slice=_slice,
            stream_state=self.state or {},
            records_schema=records_schema,
        )

        for stream_data in self._read_pages(record_generator, self.state, _slice):
            current_record = self._extract_record(stream_data, _slice)
            if self.cursor and current_record:
                self.cursor.observe(_slice, current_record)

            most_recent_record = self._get_most_recent_record(
                most_recent_record, current_record, _slice
            )

            # Record merging should only be done if there are multiple property chunks
            if (
                has_multiple_chunks
                and self.additional_query_properties.property_chunking
                and current_record
            ):
                merge_key = (
                    self.additional_query_properties.property_chunking.get_merge_key(
                        current_record
                    )
                )
                if merge_key:
                    merged_records[merge_key].update(current_record)
                else:
                    yield stream_data
            else:
                yield stream_data
                
    return most_recent_record, merged_records

599-612: Use existing method from QueryProperties

The _has_multiple_chunks method duplicates logic that already exists in the QueryProperties class. Consider reusing the existing method instead of reimplementing the same logic, wdyt?

 def _has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool:
     if not self.additional_query_properties:
         return False
-
-    property_chunks = iter(
-        self.additional_query_properties.get_request_property_chunks(stream_slice=stream_slice)
-    )
-    try:
-        next(property_chunks)
-        next(property_chunks)
-        return True
-    except StopIteration:
-        return False
+    
+    return self.additional_query_properties.has_multiple_chunks(stream_slice)

453-557: Add logging for debugging property chunking

For operations that involve multiple API requests and potentially complex data merging, adding detailed logging would help with debugging and understanding the flow. Consider adding debug-level logs to track the property chunking and record merging process.

 if self.additional_query_properties:
+    logger.debug(f"Stream {self.name}: Processing property chunks for stream slice {stream_slice}")
     property_chunks = list(
         self.additional_query_properties.get_request_property_chunks(
             stream_slice=stream_slice
         )
     )
     has_multiple_chunks = self._has_multiple_chunks(stream_slice=stream_slice)
+    logger.debug(f"Stream {self.name}: Found {len(property_chunks)} property chunks. Multiple chunks: {has_multiple_chunks}")
 else:
     property_chunks = [[""]]
     has_multiple_chunks = False

And when merging records:

 if merge_key:
+    logger.debug(f"Stream {self.name}: Merging record with key {merge_key}")
     merged_records[merge_key].update(current_record)
 else:
+    logger.debug(f"Stream {self.name}: No merge key found, yielding record immediately")
     yield stream_data
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)

4203-4260: Good validation of type checking!

This test confirms proper error handling when a field expected to be QueryProperties is actually a different type. It's a valuable negative test case that helps ensure robust validation.

Could we possibly enhance this by checking the specific error message content to make the test even more precise, wdyt?

-    with pytest.raises(ValueError):
+    with pytest.raises(ValueError, match="Expected QueryProperties type but got"):
         factory.create_component(
             model_type=DeclarativeStreamModel,
             component_definition=stream_manifest,
             config=input_config,
         )

4262-4345: Excellent test for multiple query properties validation!

This test ensures we correctly reject configurations with multiple QueryProperties objects in request parameters, which could lead to ambiguity. Nice error case coverage!

Similarly to the previous comment, verifying the specific error message might add an extra layer of clarity to what's being tested:

-    with pytest.raises(ValueError):
+    with pytest.raises(ValueError, match="Multiple QueryProperties"):
         factory.create_component(
             model_type=DeclarativeStreamModel,
             component_definition=stream_manifest,
             config=input_config,
         )

4367-4382: Nice validation error test!

The test appropriately verifies that invalid property_limit_type values trigger validation errors. This is important for helping users identify configuration issues early.

Since this is a pydantic ValidationError, would checking the specific validation message be valuable to ensure the error is precisely about the property limit type? For example:

-    with pytest.raises(ValidationError):
+    with pytest.raises(ValidationError) as exc_info:
         connector_builder_factory.create_component(
             model_type=PropertyChunkingModel,
             component_definition=property_chunking_model,
             config={},
         )
+    assert "property_limit_type" in str(exc_info.value)
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 63062d9 and 93f83ca.

📒 Files selected for processing (4)
  • airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1 hunks)
  • airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (6 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4 hunks)
  • unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py
  • airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py
🧰 Additional context used
🧬 Code Definitions (2)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (2)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (3)
  • QueryProperties (14-52)
  • get_request_property_chunks (28-42)
  • has_multiple_chunks (45-52)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
  • get_request_property_chunks (42-66)
  • get_merge_key (68-69)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (5)
  • PropertyChunking (1215-1234)
  • PropertiesFromEndpoint (2325-2336)
  • QueryProperties (2339-2356)
  • PropertyLimitType (1210-1212)
  • SimpleRetriever (2375-2429)
airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py (2)
  • PropertyChunking (25-69)
  • PropertyLimitType (14-21)
airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py (1)
  • QueryProperties (14-52)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
🔇 Additional comments (10)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (7)

6-6: LGTM - defaultdict added for record merging

This import is necessary for the new record merging functionality for property chunking.


16-16: LGTM - MutableMapping added for type hinting

Adding MutableMapping for proper type hinting of the defaultdict.


36-36: LGTM - Import for QueryProperties

This import connects the new query properties functionality to the SimpleRetriever.


94-94: LGTM - QueryProperties attribute added

This new attribute will store the query properties configuration, if any, and is correctly defined as Optional.


515-519: LGTM - Record merging after processing chunks

This logic correctly yields the merged records after all chunks have been processed. The merged_records dictionary collects records with the same merge key, and this code yields these combined records.


520-557: LGTM - Original non-chunking approach preserved

The original approach for handling records without property chunking is correctly preserved with minimal modifications. This maintains backward compatibility with existing connectors.


462-462: Why use [""] as default property chunk?

When additional_query_properties is None, you're using [[""]] as the default property_chunks. Consider using an empty list [[]] instead, as an empty string property might be interpreted differently by some APIs than no property at all. What do you think?

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)

4010-4096: Nice implementation of the query properties test!

The test effectively validates that a SimpleRetriever correctly processes static query properties, along with property chunking and merge strategy configuration. It's particularly good to see the verification that fields defined in request_parameters.query_properties are correctly placed under SimpleRetriever.additional_query_properties.

Would adding a docstring to clarify the purpose of this test be helpful for future developers, wdyt?


4106-4202: Great test for dynamic properties from endpoint!

This thoroughly validates the PropertiesFromEndpoint functionality, including proper configuration of the nested retriever for fetching properties. The test verifies all the important aspects of the dynamic property retrieval feature.


4347-4365: Good test for character-based property chunking!

This test validates that a PropertyChunking instance with character-based limits is created correctly. It verifies both the chunking type and limit value.

@@ -627,7 +627,6 @@ def test_source_missing_checker_fails_validation(self):
},
}
],
"check": {"type": "CheckStream"},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was getting test failures here and my suspicion is that because we no longer made stream_names a required field since it also supports dynamic streams check configs. This was in #450

So then this component is actually valid and does not throw the Validation error anymore. Getting rid of this component entirely causes the test to correctly fail.

What I don't understand is why the tests didn't fail on the above PR. Nonetheless getting rid of this should fix my build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants