-
Notifications
You must be signed in to change notification settings - Fork 20
🚨🚨[DON'T MERGE] all CDK changes for google analytics migration🚨🚨 #554
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
🚨🚨[DON'T MERGE] all CDK changes for google analytics migration🚨🚨 #554
Conversation
📝 WalkthroughWalkthroughThis update introduces two new record extractors— Changes
Sequence Diagram(s)sequenceDiagram
participant HTTPResponse as HTTP Response
participant KeyExtractor as keys_extractor
participant ValueExtractor as values_extractor
participant KeyValueExtractor
participant Caller
Caller->>KeyValueExtractor: extract_records(response)
KeyValueExtractor->>KeyExtractor: extract_records(response)
KeyValueExtractor->>ValueExtractor: extract_records(response)
KeyValueExtractor->>KeyValueExtractor: Zip keys and values
KeyValueExtractor-->>Caller: Yield key-value mapped records
sequenceDiagram
participant HTTPResponse as HTTP Response
participant Extractor1
participant Extractor2
participant CombinedExtractor
participant Caller
Caller->>CombinedExtractor: extract_records(response)
CombinedExtractor->>Extractor1: extract_records(response)
CombinedExtractor->>Extractor2: extract_records(response)
CombinedExtractor->>CombinedExtractor: Zip and merge records
CombinedExtractor-->>Caller: Yield merged records
sequenceDiagram
participant DynamicSchemaLoader
participant RecordFilter
participant SchemaTransformer
participant Caller
Caller->>DynamicSchemaLoader: get_json_schema()
DynamicSchemaLoader->>DynamicSchemaLoader: Extract properties
alt schema_filter exists
DynamicSchemaLoader->>RecordFilter: filter(properties)
RecordFilter-->>DynamicSchemaLoader: filtered properties
end
DynamicSchemaLoader->>SchemaTransformer: transform(filtered properties)
SchemaTransformer-->>DynamicSchemaLoader: transformed schema
DynamicSchemaLoader-->>Caller: Return schema
Would you like to see a diagram for the multi-stream config merging process as well, or is this sufficient for your needs? Wdyt? Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
/autofix
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🔭 Outside diff range comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
3698-3723
: 🛠️ Refactor suggestion
stream_config
can beNone
– guard against it before iteratingWhen
stream_config
is missing the current logic constructs[None]
, and the subsequent_create_component_from_model(...)
call will crash type-checking (and possibly at runtime).- model_stream_configs = ( - model.stream_config if isinstance(model.stream_config, list) else [model.stream_config] - ) + if model.stream_config is None: + model_stream_configs = [] + elif isinstance(model.stream_config, list): + model_stream_configs = model.stream_config + else: + model_stream_configs = [model.stream_config]This preserves the new list-normalisation behaviour while keeping the code safe when the field is omitted.
Would you like to adopt this guard?
🧹 Nitpick comments (9)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (7)
1744-1769
: Could we improve the KeyValueExtractor metadata?
The currentdescription
is just a placeholder and theenum
has an extra space ([ KeyValueExtractor ]
). To make this self‐documenting, perhaps we could update to something like:title: Key Value Extractor description: Extracts records as key/value pairs by applying separate extractors for keys and values. enum: [KeyValueExtractor]and replace the placeholder descriptions under
keys_extractor
andvalues_extractor
with meaningful text or examples. wdyt?
1770-1791
: Could we refine the CombinedExtractor schema?
Thedescription
is currently incomplete ("Record extractor that extract with .") and theextractors
property lacks guidance. Maybe we can update to:title: Combined Extractor description: Sequentially applies multiple extractors and merges their outputs into a single record list. properties: extractors: description: List of extractor components to run in order; results are concatenated.This will help users understand when to use it. wdyt?
2369-2374
: Could we flesh out the DynamicSchemaLoaderschema_filter
?
Thedescription: placeholder
doesn’t convey its purpose. How about:title: Schema Filter description: Filter applied to schema properties after extraction; only fields matching this filter will be retained.This aligns with the new filtering feature. wdyt?
3372-3373
: Shall we update the RecordSelector docs to include new extractors?
We addedCombinedExtractor
andKeyValueExtractor
underextractor.anyOf
—should we also adjust thedescription
here to mention these new options for clarity? wdyt?
4053-4056
: Can we add metadata for thecreate_or_update
flag?
The new boolean has notitle
ordescription
. Perhaps:title: Create or Update description: If true, existing component mappings are updated instead of only creating new ones.Adding this will improve discoverability. wdyt?
4107-4110
: Could we complete the StreamConfigdefault_values
definition?
Thedescription: placeholder
and missingitems
schema make it unclear. For example:title: Default Values description: Fallback values injected into each stream config when no explicit value is provided. type: array items: anyOf: - type: string - type: numberThis will make usage obvious. wdyt?
4122-4127
: Shall we clarify that ConfigComponentsResolver accepts single or multiple configs?
Thestream_config.anyOf
now allows an array or singleStreamConfig
. Could we update the surrounding description to:description: Accepts one or many StreamConfig objects to resolve components from config.to explicitly call this out? wdyt?
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
1847-1856
: Improve docstrings forKeyValueExtractor
Both
keys_extractor
andvalues_extractor
currently say"placeholder"
, which surfaces in generated docs.
Shall we add a brief description & example so connector authors understand how to use the component? wdyt?
1858-1864
: Guard the self-referential annotationThe recursive union references
CombinedExtractor
directly. Thanks tofrom __future__ import annotations
this works today, but removing that import later would raise aNameError
.
Would quoting the inner reference ("CombinedExtractor"
) add a bit more safety for future refactors?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (10)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(6 hunks)airbyte_cdk/sources/declarative/extractors/__init__.py
(2 hunks)airbyte_cdk/sources/declarative/extractors/combined_extractor.py
(1 hunks)airbyte_cdk/sources/declarative/extractors/key_value_extractor.py
(1 hunks)airbyte_cdk/sources/declarative/manifest_declarative_source.py
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(7 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(9 hunks)airbyte_cdk/sources/declarative/resolvers/components_resolver.py
(2 hunks)airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py
(6 hunks)airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
(4 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (3)
airbyte_cdk/sources/declarative/extractors/__init__.py (4)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (1)
CombinedExtractor
(19-44)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
CombinedExtractor
(1858-1863)DpathExtractor
(696-709)KeyValueExtractor
(1847-1855)airbyte_cdk/sources/declarative/extractors/http_selector.py (1)
HttpSelector
(13-37)airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
KeyValueExtractor
(19-46)
airbyte_cdk/sources/declarative/extractors/combined_extractor.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (17)
Config
(134-135)Config
(148-149)Config
(162-163)Config
(176-177)Config
(194-195)Config
(208-209)Config
(222-223)Config
(236-237)Config
(250-251)Config
(264-265)Config
(278-279)Config
(292-293)Config
(308-309)Config
(322-323)Config
(336-337)Config
(370-371)CombinedExtractor
(1858-1863)airbyte_cdk/sources/declarative/extractors/record_extractor.py (1)
RecordExtractor
(12-27)airbyte_cdk/sources/declarative/interpolation/interpolated_string.py (1)
InterpolatedString
(13-79)airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (1)
extract_records
(38-46)
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
StreamConfig
(1485-1494)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
[error] 183-183: Need type annotation for "filtered_properties" (hint: "filtered_properties: dict[, ] = ...") [var-annotated]
airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py
[error] 97-97: Function is missing a return type annotation [no-untyped-def]
[error] 98-98: Function is missing a type annotation [no-untyped-def]
[error] 103-103: Function is missing a type annotation [no-untyped-def]
[error] 106-106: Function is missing a return type annotation [no-untyped-def]
[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]
[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]
[error] 112-112: Unsupported left operand type for + ("object") [operator]
[error] 113-113: Need type annotation for "item" [var-annotated]
[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]
[error] 115-115: Function is missing a type annotation [no-untyped-def]
[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]
[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 2476-2476: Argument 1 to "_create_component_from_model" of "ModelToComponentFactory" has incompatible type "RecordFilter | CustomRecordFilter | None"; expected "BaseModel" [arg-type]
airbyte_cdk/sources/declarative/models/declarative_component_schema.py
[error] 1493-1493: Missing type parameters for generic type "List" [type-arg]
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: Check: 'source-hardcoded-records' (skip=false)
- GitHub Check: Check: 'source-amplitude' (skip=false)
- GitHub Check: Check: 'source-pokeapi' (skip=false)
- GitHub Check: Check: 'source-shopify' (skip=false)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
🔇 Additional comments (18)
airbyte_cdk/sources/declarative/extractors/__init__.py (2)
5-8
: Excellent addition of new extractor classes!I see you've added imports for two new extractor classes (
CombinedExtractor
andKeyValueExtractor
) that provide flexible record extraction strategies. The imports are appropriately placed alongside other extractors.
23-24
: Good job updating the all listThe new extractor classes have been properly added to the
__all__
list to make them part of the public API.airbyte_cdk/sources/declarative/resolvers/components_resolver.py (2)
25-25
: Good addition of create_or_update flagAdding the
create_or_update
flag toComponentMappingDefinition
with a default value ofFalse
provides nice flexibility for component resolution. This will support the broader enhancement of conditional creation/updating of configuration paths.
38-38
: Consistency maintained between related dataclassesThe same
create_or_update
flag has been properly added toResolvedComponentMappingDefinition
as well, maintaining consistency between these related dataclasses.airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
265-265
: Nice refactoring to use the property accessorYou've replaced the direct method call with the property accessor, which is cleaner and more maintainable. The
dynamic_streams
property properly encapsulates the implementation details.airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py (3)
13-13
: Good addition of RecordFilter importThis import supports the new filtering capability you're adding to the schema loader.
130-130
: Well-designed optional schema_filter parameterAdding the optional schema filter parameter extends the DynamicSchemaLoader's capabilities while maintaining backward compatibility.
156-156
: Good implementation of the filtering workflowYou've modified the property transformation flow to apply filtering before transformation, which is a logical sequence.
airbyte_cdk/sources/declarative/extractors/key_value_extractor.py (2)
5-15
: Trim the unused imports to calm the linters, wdyt?
dpath
,Decoder
,JsonDecoder
,InterpolatedString
,InitVar
,field
(and evenMapping
,List
,Union
) are not referenced anywhere in the file. Keeping them around will trigger static-analysis noise and slightly lengthen import time.-from dataclasses import InitVar, dataclass, field -from itertools import islice -from typing import Any, Iterable, List, Mapping, MutableMapping, Union - -import dpath -import requests - -from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder -from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor -from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString -from airbyte_cdk.sources.types import Config +from dataclasses import dataclass +from itertools import islice +from typing import Any, Iterable, MutableMapping + +import requests + +from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractorThis should silence the “unused import” warnings without altering behaviour.
[ suggest_nitpick ]
38-46
: Guard against “no-keys” & type-mismatch corner cases, wdyt?
- If
keys_extractor
yields an empty list, we enter an infinitewhile True
loop that breaks immediately but still performs a useless iteration. A short-circuit test would be clearer.- We assume each item yielded by
keys_extractor
is hashable and represents a single key. If a downstream user returns a dict (a common pattern for other extractors) we’ll end up with unintuitivedict_keys
objects inzip
. A tiny validation layer could prevent surprises.keys = list(self.keys_extractor.extract_records(response)) - values = self.values_extractor.extract_records(response) - - while True: - chunk = list(islice(values, len(keys))) - if not chunk: - break - yield dict(zip(keys, chunk)) + if not keys: + return # nothing to pair – early exit + + if any(isinstance(k, MutableMapping) for k in keys): + raise ValueError("keys_extractor must yield hashable scalar values, got mapping(s)") + + values = self.values_extractor.extract_records(response) + + while True: + chunk = list(islice(values, len(keys))) + if not chunk: + break + yield dict(zip(keys, chunk))Early-exit + validation should make the extractor more robust while keeping the algorithm intact.
[ suggest_optional_refactor ]airbyte_cdk/sources/declarative/extractors/combined_extractor.py (3)
5-15
: Pare back unused imports for clarity, wdyt?As in the sibling file,
dpath
,Decoder
,JsonDecoder
,InterpolatedString
,InitVar
,field
,Mapping
,Union
,List
are never used and can be removed to stop linter complaints.[ suggest_nitpick ]
18-33
: Doc-string talks about keys/values but this extractor merges arbitrary recordsThe current documentation is a copy-paste from
KeyValueExtractor
. It can mislead future maintainers into thinking the class requires exactly two extractors that return parallel key/value arrays. Could we replace it with something like:""" Merges the outputs of N extractors record-by-record. Each extractor is invoked with the same HTTP response; the first record from each extractor is merged together (later extractors override keys set by earlier ones), then the second record from each extractor and so on until the shortest extractor is exhausted. """
Updating the narrative now will avoid confusion later, wdyt?
[ suggest_nitpick ]
37-44
: Handle uneven extractor lengths or preferzip_longest
, wdyt?Using
zip(*iterables)
stops at the shortest iterable, silently discarding surplus records from longer extractors. Would adoptingitertools.zip_longest
(with e.g.{}
as the fill value) make the behaviour clearer and safer?-from records in zip(*extractors_records): +from records in itertools.zip_longest(*extractors_records, fillvalue={})You may also want to skip or log when a filler value is encountered to surface data-alignment issues.
[ suggest_optional_refactor ]airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py (3)
97-125
: Missing type hints are breaking CI – extract helpers & annotate them, wdyt?
_stream_config
defines several nested functions without annotations; mypy/ruff are flagging them (no-untyped-def
,arg-type
, etc.). CI is currently red. Splitting them into private methods and adding signatures will both appease the linters and improve readability.Example sketch:
- def _stream_config(self): - def resolve_path(pointer): + def _stream_config(self) -> List[Dict[str, Any]]: + def _resolve_path(pointer: List[InterpolatedString | str]) -> List[str]: ... - - def normalize_configs(configs): + def _normalize_configs(configs: Any) -> List[Any]: ... - - def prepare_streams(): + def _prepare_streams() -> List[List[tuple[int, Any]]]: ... - - def merge_combination(combo): + def _merge_combination(combo: tuple[tuple[int, Any], ...]) -> Dict[str, Any]: ... - - all_indexed_streams = list(prepare_streams()) - return [merge_combination(combo) for combo in product(*all_indexed_streams)] + all_indexed_streams = list(_prepare_streams()) + return [_merge_combination(combo) for combo in product(*all_indexed_streams)]This change should resolve the reported linter errors (97-125) and make the intent of each helper explicit.
[ raise_critical_issue ]🧰 Tools
🪛 GitHub Actions: Linters
[error] 97-97: Function is missing a return type annotation [no-untyped-def]
[error] 98-98: Function is missing a type annotation [no-untyped-def]
[error] 103-103: Function is missing a type annotation [no-untyped-def]
[error] 106-106: Function is missing a return type annotation [no-untyped-def]
[error] 108-108: Call to untyped function "resolve_path" in typed context [no-untyped-call]
[error] 110-110: Call to untyped function "normalize_configs" in typed context [no-untyped-call]
[error] 112-112: Unsupported left operand type for + ("object") [operator]
[error] 113-113: Need type annotation for "item" [var-annotated]
[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]
[error] 115-115: Function is missing a type annotation [no-untyped-def]
[error] 124-124: Call to untyped function "prepare_streams" in typed context [no-untyped-call]
[error] 125-125: Call to untyped function "merge_combination" in typed context [no-untyped-call]
111-113
:stream_configs += default_values
mutates the original listBecause
dpath.get
may return a direct reference to a sub-list insideself.config
, in-place+=
could unintentionally mutate the user-supplied configuration. Would making a shallow copy before appending be safer?- stream_configs = dpath.get(dict(self.config), path, default=[]) - stream_configs = normalize_configs(stream_configs) - if stream_config.default_values: - stream_configs += stream_config.default_values + raw = dpath.get(dict(self.config), path, default=[]) + stream_configs = normalize_configs(list(raw)) # copy + if stream_config.default_values: + stream_configs.extend(stream_config.default_values)This avoids side-effects on
self.config
, wdyt?
[ suggest_optional_refactor ]🧰 Tools
🪛 GitHub Actions: Linters
[error] 112-112: Unsupported left operand type for + ("object") [operator]
[error] 113-113: Need type annotation for "item" [var-annotated]
[error] 113-113: Argument 1 to "enumerate" has incompatible type "object"; expected "Iterable[Never]" [arg-type]
153-159
: Graceful path creation: shoulddpath.new
respectcreate_or_update=False
?Right now we attempt
dpath.set
, and if it fails we always fall back todpath.new
providedcreate_or_update
is true. All good. Ifcreate_or_update
is false the value is silently dropped. Would logging or raising a descriptive error help users debug why a mapping hasn’t been applied, wdyt?
[ suggest_nitpick ]airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
1499-1500
: Potential naming mismatch withstream_configs
Elsewhere in the PR the corresponding resolver expects
stream_configs
; here the field isstream_config
.
Would accepting the plural form via an alias (or renaming the attribute) help avoid runtime KeyErrors? wdyt?- stream_config: Union[List[StreamConfig], StreamConfig] + stream_config: Union[List[StreamConfig], StreamConfig] = Field( + ..., + alias="stream_configs", + )
2436-2438
: Clarify execution order betweenschema_filter
andschema_transformations
When both are provided, which one runs first? A short note in the field description—or a
root_validator
enforcing the order—could prevent subtle bugs. What do you think?
/autofix
|
🚨🚨DON'T MERGE THIS PR🚨🚨
This PR contains changes to the CDK that are intended for pre-release. It is used to run CI tests and avoid blocking the migration while CDK changes are under review.
USED IN airbytehq/airbyte#60342
Summary by CodeRabbit
New Features
Enhancements