-
Notifications
You must be signed in to change notification settings - Fork 20
feat(cdk): update ConfigComponentsResolver to support list of stream_config #553
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(cdk): update ConfigComponentsResolver to support list of stream_config #553
Conversation
📝 WalkthroughWalkthroughThe changes expand the flexibility of declarative source schemas and their Python models. New optional fields such as Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant ManifestDeclarativeSource
participant ConfigComponentsResolver
participant StreamConfig
User->>ManifestDeclarativeSource: Request streams(config)
ManifestDeclarativeSource->>ManifestDeclarativeSource: Get static streams
ManifestDeclarativeSource->>ManifestDeclarativeSource: Get dynamic_streams property
ManifestDeclarativeSource->>ConfigComponentsResolver: Resolve components (with stream_configs)
ConfigComponentsResolver->>StreamConfig: Enumerate configs (with default_values)
ConfigComponentsResolver->>ConfigComponentsResolver: Merge configs (Cartesian product)
ConfigComponentsResolver->>ManifestDeclarativeSource: Return resolved stream configs
ManifestDeclarativeSource->>User: Return concatenated static + dynamic streams
sequenceDiagram
participant ConfigComponentsResolver
participant dpath
participant yaml
loop For each mapping in resolved_component_mappings
ConfigComponentsResolver->>yaml: Try to parse value as YAML
alt parse successful
ConfigComponentsResolver->>dpath: set(config, path, parsed_value)
else parse fails
ConfigComponentsResolver->>dpath: set(config, path, original_value)
end
alt dpath.set fails and create_or_update is True
ConfigComponentsResolver->>dpath: new(config, path, value)
end
end
Would you like a diagram showing how the new Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure 📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (4)
🚧 Files skipped from review as they are similar to previous changes (4)
⏰ Context from checks skipped due to timeout of 90000ms (3)
✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/resolvers/components_resolver.py (1)
1-58
: 💡 Verification agent🧩 Analysis chain
Pipeline failure in docs generation.
There's a pipeline failure related to importing
airbyte_cdk
due to aNameError
indeclarative_component_schema.py
. This suggests the schema file may need to be updated to match the new additions.
🏁 Script executed:
#!/bin/bash # Check if there are changes to declarative_component_schema.py in this PR fd -t f declarative_component_schema.py # If it exists, check if it contains the new create_or_update field if [ $? -eq 0 ]; then rg -A 3 "create_or_update" ./airbyte_cdk/sources/declarative/declarative_component_schema.yaml fiLength of output: 313
I’d like to confirm that the Python schema models have the new
create_or_update
field defined. Let me inspect those classes:
🏁 Script executed:
#!/bin/bash echo "Searching for create_or_update occurrences:" rg -n "create_or_update" airbyte_cdk/sources/declarative/models/declarative_component_schema.py || echo "No occurrences found" echo -e "\nInspecting ComponentMappingDefinition in schema:" rg -n "class ComponentMappingDefinition" -A5 airbyte_cdk/sources/declarative/models/declarative_component_schema.py echo -e "\nInspecting ResolvedComponentMappingDefinition in schema:" rg -n "class ResolvedComponentMappingDefinition" -A5 airbyte_cdk/sources/declarative/models/declarative_component_schema.pyLength of output: 1040
I want to confirm that both Pydantic models declare the new
create_or_update
field. Could we inspect their full definitions? wdyt?
🏁 Script executed:
#!/bin/bash rg -n "class ComponentMappingDefinition" -A50 airbyte_cdk/sources/declarative/models/declarative_component_schema.py rg -n "class ResolvedComponentMappingDefinition" -A50 airbyte_cdk/sources/declarative/models/declarative_component_schema.pyLength of output: 2602
Let’s check the imports at the top of the Pydantic schema to see if anything (like Field or ValueType) is missing:
🏁 Script executed:
#!/bin/bash echo "Top 50 lines of declarative_component_schema.py:" head -n50 airbyte_cdk/sources/declarative/models/declarative_component_schema.pyLength of output: 1858
Could we confirm that the YAML schema defines
ResolvedComponentMappingDefinition
(so that the Pydantic models get generated correctly)? wdyt?
🏁 Script executed:
#!/bin/bash echo "Checking declarative_component_schema.yaml for ResolvedComponentMappingDefinition:" rg -n "ResolvedComponentMappingDefinition" -A3 airbyte_cdk/sources/declarative/declarative_component_schema.yaml || echo ">> Not found in YAML schema"Length of output: 352
Docs generation NameError in declarative_component_schema.py
It looks like the Pydantic model in
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
is referencingValueType
but never imports or defines it, which causes the NameError during Sphinx docs builds. Also, I noticed there’s no entry forResolvedComponentMappingDefinition
in your YAML schema (so no generated model for it)—is that intentional or should it be exposed for config validation?• In
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
:
– Add the missing import forValueType
(e.g. fromairbyte_cdk.sources.declarative.interpolation
or wherever it’s declared)
• Inairbyte_cdk/sources/declarative/declarative_component_schema.yaml
:
– Confirm whetherResolvedComponentMappingDefinition
belongs here (if it should be part of the JSON schema, add its definition so datamodel-codegen will generate the class)Wdyt?
🧰 Tools
🪛 GitHub Actions: Generate Docs
[error] 1-1: RuntimeError: Error importing airbyte_cdk due to NameError in declarative_component_schema.py.
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)
2311-2316
: Provide a descriptive description forschema_filter
The newschema_filter
property currently has a placeholder description. Could we elaborate its intended role in filtering schema records, clarify how it interacts with the dynamic loader, and include examples for bothRecordFilter
andCustomRecordFilter
? wdyt?
4003-4005
: Add documentation forcreate_or_update
flag
Thecreate_or_update
boolean is introduced without title/description context. Could we document what this flag controls in component mappings (e.g., whether to create missing fields or update existing ones), and provide usage examples? wdyt?
4072-4076
: Enhance examples for multistream_config
support
Sincestream_config
now accepts either a single object or an array, could we augment the schema with illustrative examples showcasing both a singleStreamConfig
and a list of them to guide users? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (7)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(4 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(5 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(4 hunks)airbyte_cdk/sources/declarative/resolvers/components_resolver.py
(2 hunks)airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py
(6 hunks)airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (2)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
StreamConfig
(1485-1494)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (1)
DynamicSchemaLoader
(120-297)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (17)
DynamicSchemaLoader
(2410-2439)Config
(134-135)Config
(148-149)Config
(162-163)Config
(176-177)Config
(194-195)Config
(208-209)Config
(222-223)Config
(236-237)Config
(250-251)Config
(264-265)Config
(278-279)Config
(292-293)Config
(308-309)Config
(322-323)Config
(336-337)Config
(370-371)
🪛 GitHub Actions: Generate Docs
airbyte_cdk/sources/declarative/manifest_declarative_source.py
[error] 1-1: RuntimeError: Error importing airbyte_cdk due to NameError in declarative_component_schema.py.
airbyte_cdk/sources/declarative/resolvers/components_resolver.py
[error] 1-1: RuntimeError: Error importing airbyte_cdk due to NameError in declarative_component_schema.py.
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
[error] 1-1: RuntimeError: Error importing airbyte_cdk due to NameError in declarative_component_schema.py.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
[error] 1-1: RuntimeError: Error importing airbyte_cdk due to NameError in declarative_component_schema.py.
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py
[error] 1-1: RuntimeError: Error importing airbyte_cdk due to NameError in declarative_component_schema.py.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 1-1: RuntimeError: Error importing airbyte_cdk due to NameError in declarative_component_schema.py.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 2755-2755: NameError: name 'CombinedExtractor' is not defined. This error causes multiple import failures across various modules that depend on this model.
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
[error] 182-182: Argument 1 to "filter_records" of "RecordFilter" has incompatible type "ItemsView[str, Any]"; expected "Iterable[Mapping[str, Any]]" [arg-type]
[error] 183-183: Invalid index type "int" for "Mapping[str, Any]"; expected type "str" [index]
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py
[error] 97-97: Function is missing a return type annotation [no-untyped-def]
[error] 98-98: Function is missing a type annotation [no-untyped-def]
[error] 103-103: Function is missing a type annotation [no-untyped-def]
[error] 106-106: Function is missing a return type annotation [no-untyped-def]
[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]
[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]
[error] 112-112: Unsupported left operand type for + ("object") [operator]
[error] 113-113: Need type annotation for "item" [var-annotated]
[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]
[error] 115-115: Function is missing a type annotation [no-untyped-def]
[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]
[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 2436-2436: Argument 1 to "_create_component_from_model" of "ModelToComponentFactory" has incompatible type "RecordFilter | CustomRecordFilter | None"; expected "BaseModel" [arg-type]
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 1493-1493: Missing type parameters for generic type "List" [type-arg]
[error] 2755-2755: Name "CombinedExtractor" is not defined [name-defined]
🪛 GitHub Actions: PyTest Matrix
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 2755-2755: NameError: name 'CombinedExtractor' is not defined at line 2755. This caused the pytest run to fail with exit code 4.
🪛 GitHub Actions: Pytest (Fast)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 2755-2755: NameError: name 'CombinedExtractor' is not defined at line 2755. This caused the pytest run to fail with exit code 4.
🔇 Additional comments (23)
airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2)
25-25
: New flag added for component mapping configuration.The new
create_or_update
flag looks good. This will allow for conditional creation of config paths when setting values during component resolution.
38-38
: Consistent propagation of thecreate_or_update
flag.Good to see the
create_or_update
flag also added to the resolved component mapping definition, maintaining consistency between the two classes.airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)
265-265
: Code refactoring to use property instead of direct method call.Good refactoring! Using the
dynamic_streams
property instead of directly calling_dynamic_stream_configs
makes the code more modular and easier to maintain.
229-235
: Well-designed property that encapsulates dynamic stream config generation.The
dynamic_streams
property is well implemented, abstracting away the details of how dynamic stream configs are generated while still providing the same functionality.airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (4)
13-13
: New import for schema filtering capability.The import of
RecordFilter
supports the new schema filtering functionality. Looks good.
130-130
: Addition of optional schema filter.Adding an optional
schema_filter
parameter allows for more flexible schema definitions. Good enhancement.
156-156
: Updated to apply filter before transformation.Applying the filter before transformations ensures that we don't waste processing on properties that will be filtered out later. Good optimization.
162-162
: Using filtered and transformed properties in schema.The filtered and transformed properties are now used in the schema. This is consistent with the addition of the filtering step.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
3604-3604
: LGTM! Support for the create_or_update flagThe addition of the
create_or_update
parameter to ComponentMappingDefinition looks good. This allows for more flexibility in how components are created during the mapping process.
3651-3651
: LGTM! Support for default_values in StreamConfigAdding
default_values
to StreamConfig allows for setting default values for components created from the stream configuration. This is a nice enhancement!
3658-3667
: LGTM! Support for handling a list of stream_config objectsThis change effectively transforms the stream_config handling to support either a single stream config or a list of configs by normalizing into a list either way. This gives users the flexibility to define configs in whichever format is more convenient for their use case.
3681-3681
: LGTM! Updated ConfigComponentsResolver constructor parameter nameChanged the parameter name from
stream_config
tostream_configs
to reflect that it now takes a list instead of a single item. Good attention to detail!airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (7)
7-8
: Good addition of required imports.The import of
product
fromitertools
and the extended typing imports will support the new functionality for handling multiple stream configs. This looks well thought out.
32-32
: Nice addition of the default_values field to StreamConfig.The new optional field will allow for specifying default values for stream configurations, improving flexibility. This aligns well with the model changes in the schema file.
53-53
: Now supporting multiple stream configs instead of a single one.This change from
stream_config
tostream_configs
enables handling multiple stream configurations, which is a great enhancement for flexibility. Well done on maintaining backward compatibility through the schema changes.
87-87
: Good addition of the create_or_update flag to resolved components.Preserving the
create_or_update
flag during component resolution is important for the enhanced functionality added later in this PR.
154-158
: Enhanced value setting with YAML parsing and fallback creation.Great enhancement to the
resolve_components
method to parse string values as YAML and conditionally create paths when they don't exist. The conditional creation based on thecreate_or_update
flag is a nice touch that allows for more flexible configurations.
162-171
: Well-structured static method for YAML parsing.The
_parse_yaml_if_possible
method provides a clean way to attempt YAML parsing with proper error handling. This improves the robustness of the code.
97-125
:❓ Verification inconclusive
Comprehensive rewrite of the _stream_config property, but missing type annotations.
The new implementation handles multiple stream configs and includes normalization, flattening, and merging functionality. This is a significant improvement in flexibility.
However, the function lacks proper type annotations, which is causing pipeline failures.
Could you add type annotations to the methods in the _stream_config property? Here are the missing types that should be added:
- def _stream_config(self): + def _stream_config(self) -> List[Dict[str, Any]]: - def resolve_path(pointer): + def resolve_path(pointer: List[Union[InterpolatedString, str]]) -> List[str]: - def normalize_configs(configs): + def normalize_configs(configs: Any) -> List[Any]: - def prepare_streams(): + def prepare_streams() -> Iterable[List[Tuple[int, Any]]]: - def merge_combination(combo): + def merge_combination(combo: Tuple[Tuple[int, Any], ...]) -> Dict[str, Any]:
🏁 Script executed:
#!/bin/bash # Verify type annotations are causing failures by checking the error log rg "no-untyped-def" -A 2 -B 2Length of output: 29
Type annotation lint errors not found—need actual pipeline logs?
I ran
rg "no-untyped-def"
across the repo and didn’t spot any failures inairbyte_cdk/sources/declarative/resolvers/config_components_resolver.py
. Could you share the exact pipeline error or logs showing where the missing annotations are reported? wdyt?🧰 Tools
🪛 GitHub Actions: Linters
[error] 97-97: Function is missing a return type annotation [no-untyped-def]
[error] 98-98: Function is missing a type annotation [no-untyped-def]
[error] 103-103: Function is missing a type annotation [no-untyped-def]
[error] 106-106: Function is missing a return type annotation [no-untyped-def]
[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]
[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]
[error] 112-112: Unsupported left operand type for + ("object") [operator]
[error] 113-113: Need type annotation for "item" [var-annotated]
[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]
[error] 115-115: Function is missing a type annotation [no-untyped-def]
[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]
[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4)
1481-1481
: Good addition of create_or_update flag to ComponentMappingDefinition.Adding the optional
create_or_update
field with a default value ofFalse
aligns well with the implementation in the resolver. This will provide flexibility when creating or updating component mappings.
1499-1499
: Great update to support both single and multiple stream configs.Changing the type of
stream_config
to a union of a singleStreamConfig
or a list ofStreamConfig
is a well-designed enhancement that supports the new functionality while maintaining backward compatibility.
2167-2167
: Improved schema loader description.The updated description for
schema_loader
clearly explains how multiple schema loaders work together, with precedence rules. This enhancement to the documentation is valuable for users.
2417-2419
: Added schema_filter to DynamicSchemaLoader.The addition of the optional
schema_filter
field toDynamicSchemaLoader
that can be either aRecordFilter
or aCustomRecordFilter
improves filtering capabilities.
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Outdated
Show resolved
Hide resolved
/autofix
|
Status: MyPy fix in progress |
What
Resolved: https://github.com/airbytehq/airbyte-internal-issues/issues/13048
This PR adds a
create_or_update
flag to indicate whether a value should be created if it is not originally present in the stream template. This can happen when certain values must be present due to schema validation.Introduces a default_values list that extends the iteration sequence in the source config, providing a more flexible way to define default values for the
ConfigComponentsResolver
. AndConfigComponentsResolver
is updated to support a list ofstream_config
entries.How
ConfigComponentsResolver
to support iterating over a list of stream_config entries.create_or_update
flag during the value resolution process.default_values
to the iteration sequence to ensure fallback values are correctly processed.Summary by CodeRabbit
New Features
Bug Fixes
Summary by CodeRabbit