diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 46a202b19..2f09579ba 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -341,7 +341,7 @@ definitions: properties: type: type: string - enum: [ DynamicStreamCheckConfig ] + enum: [DynamicStreamCheckConfig] dynamic_stream_name: title: Dynamic Stream Name description: The dynamic stream name. @@ -1752,6 +1752,30 @@ definitions: $parameters: type: object additionalProperties: true + GroupByKeyMergeStrategy: + title: Group by Key + description: Record merge strategy that combines records according to fields on the record. + required: + - type + - key + properties: + type: + type: string + enum: [GroupByKeyMergeStrategy] + key: + title: Key + description: The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests. + anyOf: + - type: string + - type: array + items: + type: string + examples: + - "id" + - ["parent_id", "end_date"] + $parameters: + type: object + additionalProperties: true SessionTokenAuthenticator: type: object required: @@ -1971,7 +1995,9 @@ definitions: - type: string - type: object additionalProperties: - type: string + anyOf: + - type: string + - $ref": "#/definitions/QueryProperties" interpolation_context: - next_page_token - stream_interval @@ -2989,6 +3015,96 @@ definitions: examples: - id - ["code", "type"] + PropertiesFromEndpoint: + title: Properties from Endpoint + description: Defines the behavior for fetching the list of properties from an API that will be loaded into the requests to extract records. + type: object + required: + - type + - property_field_path + - retriever + properties: + type: + type: string + enum: [PropertiesFromEndpoint] + property_field_path: + description: Describes the path to the field that should be extracted + type: array + items: + type: string + examples: + - ["name"] + interpolation_context: + - config + - parameters + retriever: + description: Requester component that describes how to fetch the properties to query from a remote API endpoint. + anyOf: + - "$ref": "#/definitions/CustomRetriever" + - "$ref": "#/definitions/SimpleRetriever" + $parameters: + type: object + additionalProperties: true + PropertyChunking: + title: Property Chunking + description: For APIs with restrictions on the amount of properties that can be requester per request, property chunking can be applied to make multiple requests with a subset of the properties. + type: object + required: + - type + - property_limit_type + properties: + type: + type: string + enum: [PropertyChunking] + property_limit_type: + title: Property Limit Type + description: The type used to determine the maximum number of properties per chunk + enum: + - characters + - property_count + property_limit: + title: Property Limit + description: The maximum amount of properties that can be retrieved per request according to the limit type. + type: integer + record_merge_strategy: + title: Record Merge Strategy + description: Dictates how to records that require multiple requests to get all properties should be emitted to the destination + "$ref": "#/definitions/GroupByKeyMergeStrategy" + $parameters: + type: object + additionalProperties: true + QueryProperties: + title: Query Properties + description: For APIs that require explicit specification of the properties to query for, this component specifies which property fields and how they are supplied to outbound requests. + type: object + required: + - type + - property_list + properties: + type: + type: string + enum: [QueryProperties] + property_list: + title: Property List + description: The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint + anyOf: + - type: array + items: + type: string + - "$ref": "#/definitions/PropertiesFromEndpoint" + always_include_properties: + title: Always Include Properties + description: The list of properties that should be included in every set of properties when multiple chunks of properties are being requested. + type: array + items: + type: string + property_chunking: + title: Property Chunking + description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request. + "$ref": "#/definitions/PropertyChunking" + $parameters: + type: object + additionalProperties: true RecordFilter: title: Record Filter description: Filter applied on a list of records. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index c67cd958b..3566abef4 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,3 +1,5 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -49,7 +51,7 @@ class DynamicStreamCheckConfig(BaseModel): ) stream_count: Optional[int] = Field( 0, - description="Numbers of the streams to try reading from when running a check operation.", + description="The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.", title="Stream Count", ) @@ -718,6 +720,17 @@ class ExponentialBackoffStrategy(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class GroupByKeyMergeStrategy(BaseModel): + type: Literal["GroupByKeyMergeStrategy"] + key: Union[str, List[str]] = Field( + ..., + description="The name of the field on the record whose value will be used to group properties that were retrieved through multiple API requests.", + examples=["id", ["parent_id", "end_date"]], + title="Key", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SessionTokenRequestBearerAuthenticator(BaseModel): type: Literal["Bearer"] @@ -1189,6 +1202,31 @@ class PrimaryKey(BaseModel): ) +class PropertyLimitType(Enum): + characters = "characters" + property_count = "property_count" + + +class PropertyChunking(BaseModel): + type: Literal["PropertyChunking"] + property_limit_type: PropertyLimitType = Field( + ..., + description="The type used to determine the maximum number of properties per chunk", + title="Property Limit Type", + ) + property_limit: Optional[int] = Field( + None, + description="The maximum amount of properties that can be retrieved per request according to the limit type.", + title="Property Limit", + ) + record_merge_strategy: Optional[GroupByKeyMergeStrategy] = Field( + None, + description="Dictates how to records that require multiple requests to get all properties should be emitted to the destination", + title="Record Merge Strategy", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class RecordFilter(BaseModel): type: Literal["RecordFilter"] condition: Optional[str] = Field( @@ -2187,7 +2225,7 @@ class HttpRequester(BaseModel): examples=[{"Output-Format": "JSON"}, {"Version": "{{ config['version'] }}"}], title="Request Headers", ) - request_parameters: Optional[Union[str, Dict[str, str]]] = Field( + request_parameters: Optional[Union[str, Dict[str, Union[str, Any]]]] = Field( None, description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.", examples=[ @@ -2277,6 +2315,40 @@ class ParentStreamConfig(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class PropertiesFromEndpoint(BaseModel): + type: Literal["PropertiesFromEndpoint"] + property_field_path: List[str] = Field( + ..., + description="Describes the path to the field that should be extracted", + examples=[["name"]], + ) + retriever: Union[CustomRetriever, SimpleRetriever] = Field( + ..., + description="Requester component that describes how to fetch the properties to query from a remote API endpoint.", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + +class QueryProperties(BaseModel): + type: Literal["QueryProperties"] + property_list: Union[List[str], PropertiesFromEndpoint] = Field( + ..., + description="The set of properties that will be queried for in the outbound request. This can either be statically defined or dynamic based on an API endpoint", + title="Property List", + ) + always_include_properties: Optional[List[str]] = Field( + None, + description="The list of properties that should be included in every set of properties when multiple chunks of properties are being requested.", + title="Always Include Properties", + ) + property_chunking: Optional[PropertyChunking] = Field( + None, + description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.", + title="Property Chunking", + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] name: str = Field(..., description="The stream name.", example=["Users"], title="Name") @@ -2525,5 +2597,6 @@ class DynamicDeclarativeStream(BaseModel): SessionTokenAuthenticator.update_forward_refs() DynamicSchemaLoader.update_forward_refs() ParentStreamConfig.update_forward_refs() +PropertiesFromEndpoint.update_forward_refs() SimpleRetriever.update_forward_refs() AsyncRetriever.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4f4638190..25840f06f 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # from __future__ import annotations @@ -234,6 +234,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( FlattenFields as FlattenFieldsModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + GroupByKeyMergeStrategy as GroupByKeyMergeStrategyModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( GroupingPartitionRouter as GroupingPartitionRouterModel, ) @@ -324,6 +327,18 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ParentStreamConfig as ParentStreamConfigModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + PropertiesFromEndpoint as PropertiesFromEndpointModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + PropertyChunking as PropertyChunkingModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + PropertyLimitType as PropertyLimitTypeModel, +) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + QueryProperties as QueryPropertiesModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( Rate as RateModel, ) @@ -432,6 +447,17 @@ PageIncrement, StopConditionPaginationStrategyDecorator, ) +from airbyte_cdk.sources.declarative.requesters.query_properties import ( + PropertiesFromEndpoint, + PropertyChunking, + QueryProperties, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyLimitType, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import ( + GroupByKey, +) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.request_options import ( DatetimeBasedRequestOptionsProvider, @@ -596,6 +622,7 @@ def _init_mappings(self) -> None: ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, + GroupByKeyMergeStrategyModel: self.create_group_by_key, HttpRequesterModel: self.create_http_requester, HttpResponseFilterModel: self.create_http_response_filter, InlineSchemaLoaderModel: self.create_inline_schema_loader, @@ -625,6 +652,9 @@ def _init_mappings(self) -> None: OffsetIncrementModel: self.create_offset_increment, PageIncrementModel: self.create_page_increment, ParentStreamConfigModel: self.create_parent_stream_config, + PropertiesFromEndpointModel: self.create_properties_from_endpoint, + PropertyChunkingModel: self.create_property_chunking, + QueryPropertiesModel: self.create_query_properties, RecordFilterModel: self.create_record_filter, RecordSelectorModel: self.create_record_selector, RemoveFieldsModel: self.create_remove_fields, @@ -2083,8 +2113,8 @@ def create_dpath_extractor( parameters=model.parameters or {}, ) + @staticmethod def create_response_to_file_extractor( - self, model: ResponseToFileExtractorModel, **kwargs: Any, ) -> ResponseToFileExtractor: @@ -2098,11 +2128,17 @@ def create_exponential_backoff_strategy( factor=model.factor or 5, parameters=model.parameters or {}, config=config ) + @staticmethod + def create_group_by_key(model: GroupByKeyMergeStrategyModel, config: Config) -> GroupByKey: + return GroupByKey(model.key, config=config, parameters=model.parameters or {}) + def create_http_requester( self, model: HttpRequesterModel, config: Config, decoder: Decoder = JsonDecoder(parameters={}), + query_properties_key: Optional[str] = None, + use_cache: Optional[bool] = None, *, name: str, ) -> HttpRequester: @@ -2135,6 +2171,7 @@ def create_http_requester( request_body_json=model.request_body_json, request_headers=model.request_headers, request_parameters=model.request_parameters, + query_properties_key=query_properties_key, config=config, parameters=model.parameters or {}, ) @@ -2142,7 +2179,7 @@ def create_http_requester( assert model.use_cache is not None # for mypy assert model.http_method is not None # for mypy - use_cache = model.use_cache and not self._disable_cache + should_use_cache = (model.use_cache or bool(use_cache)) and not self._disable_cache return HttpRequester( name=name, @@ -2157,7 +2194,7 @@ def create_http_requester( disable_retries=self._disable_retries, parameters=model.parameters or {}, message_repository=self._message_repository, - use_cache=use_cache, + use_cache=should_use_cache, decoder=decoder, stream_response=decoder.is_stream_response() if decoder else False, ) @@ -2261,10 +2298,11 @@ def create_dynamic_schema_loader( retriever = self._create_component_from_model( model=model.retriever, config=config, - name="", + name="dynamic_properties", primary_key=None, stream_slicer=combined_slicers, transformations=[], + use_cache=True, ) schema_type_identifier = self._create_component_from_model( model.schema_type_identifier, config=config, parameters=model.parameters or {} @@ -2602,6 +2640,79 @@ def create_parent_stream_config( lazy_read_pointer=model_lazy_read_pointer, ) + def create_properties_from_endpoint( + self, model: PropertiesFromEndpointModel, config: Config, **kwargs: Any + ) -> PropertiesFromEndpoint: + retriever = self._create_component_from_model( + model=model.retriever, + config=config, + name="dynamic_properties", + primary_key=None, + stream_slicer=None, + transformations=[], + use_cache=True, # Enable caching on the HttpRequester/HttpClient because the properties endpoint will be called for every slice being processed, and it is highly unlikely for the response to different + ) + return PropertiesFromEndpoint( + property_field_path=model.property_field_path, + retriever=retriever, + config=config, + parameters=model.parameters or {}, + ) + + def create_property_chunking( + self, model: PropertyChunkingModel, config: Config, **kwargs: Any + ) -> PropertyChunking: + record_merge_strategy = ( + self._create_component_from_model( + model=model.record_merge_strategy, config=config, **kwargs + ) + if model.record_merge_strategy + else None + ) + + property_limit_type: PropertyLimitType + match model.property_limit_type: + case PropertyLimitTypeModel.property_count: + property_limit_type = PropertyLimitType.property_count + case PropertyLimitTypeModel.characters: + property_limit_type = PropertyLimitType.characters + case _: + raise ValueError(f"Invalid PropertyLimitType {property_limit_type}") + + return PropertyChunking( + property_limit_type=property_limit_type, + property_limit=model.property_limit, + record_merge_strategy=record_merge_strategy, + config=config, + parameters=model.parameters or {}, + ) + + def create_query_properties( + self, model: QueryPropertiesModel, config: Config, **kwargs: Any + ) -> QueryProperties: + if isinstance(model.property_list, list): + property_list = model.property_list + else: + property_list = self._create_component_from_model( + model=model.property_list, config=config, **kwargs + ) + + property_chunking = ( + self._create_component_from_model( + model=model.property_chunking, config=config, **kwargs + ) + if model.property_chunking + else None + ) + + return QueryProperties( + property_list=property_list, + always_include_properties=model.always_include_properties, + property_chunking=property_chunking, + config=config, + parameters=model.parameters or {}, + ) + @staticmethod def create_record_filter( model: RecordFilterModel, config: Config, **kwargs: Any @@ -2747,6 +2858,7 @@ def create_simple_retriever( IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel ] ] = None, + use_cache: Optional[bool] = None, **kwargs: Any, ) -> SimpleRetriever: decoder = ( @@ -2754,9 +2866,6 @@ def create_simple_retriever( if model.decoder else JsonDecoder(parameters={}) ) - requester = self._create_component_from_model( - model=model.requester, decoder=decoder, config=config, name=name - ) record_selector = self._create_component_from_model( model=model.record_selector, name=name, @@ -2765,6 +2874,57 @@ def create_simple_retriever( transformations=transformations, client_side_incremental_sync=client_side_incremental_sync, ) + + query_properties: Optional[QueryProperties] = None + query_properties_key: Optional[str] = None + if ( + hasattr(model.requester, "request_parameters") + and model.requester.request_parameters + and isinstance(model.requester.request_parameters, Mapping) + ): + query_properties_definitions = [] + for key, request_parameter in model.requester.request_parameters.items(): + # When translating JSON schema into Pydantic models, enforcing types for arrays containing both + # concrete string complex object definitions like QueryProperties would get resolved to Union[str, Any]. + # This adds the extra validation that we couldn't get for free in Pydantic model generation + if ( + isinstance(request_parameter, Mapping) + and request_parameter.get("type") == "QueryProperties" + ): + query_properties_key = key + query_properties_definitions.append(request_parameter) + elif not isinstance(request_parameter, str): + raise ValueError( + f"Each element of request_parameters should be of type str or QueryProperties, but received {request_parameter.get('type')}" + ) + + if len(query_properties_definitions) > 1: + raise ValueError( + f"request_parameters only supports defining one QueryProperties field, but found {len(query_properties_definitions)} usages" + ) + + if len(query_properties_definitions) == 1: + query_properties = self.create_component( + model_type=QueryPropertiesModel, + component_definition=query_properties_definitions[0], + config=config, + ) + + # Removes QueryProperties components from the interpolated mappings because it will be resolved in + # the provider from the slice directly instead of through jinja interpolation + if isinstance(model.requester.request_parameters, Mapping): + model.requester.request_parameters = self._remove_query_properties( + model.requester.request_parameters + ) + + requester = self._create_component_from_model( + model=model.requester, + decoder=decoder, + name=name, + query_properties_key=query_properties_key, + use_cache=use_cache, + config=config, + ) url_base = ( model.requester.url_base if hasattr(model.requester, "url_base") @@ -2870,9 +3030,21 @@ def create_simple_retriever( cursor=cursor, config=config, ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests, + additional_query_properties=query_properties, parameters=model.parameters or {}, ) + @staticmethod + def _remove_query_properties( + request_parameters: Mapping[str, Union[Any, str]], + ) -> Mapping[str, Union[Any, str]]: + return { + parameter_field: request_parameter + for parameter_field, request_parameter in request_parameters.items() + if not isinstance(request_parameter, Mapping) + or not request_parameter.get("type") == "QueryProperties" + } + def create_state_delegating_stream( self, model: StateDelegatingStreamModel, diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py b/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py new file mode 100644 index 000000000..65c741cf5 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from airbyte_cdk.sources.declarative.requesters.query_properties.properties_from_endpoint import ( + PropertiesFromEndpoint, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyChunking, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.query_properties import ( + QueryProperties, +) + +__all__ = ["PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"] diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py new file mode 100644 index 000000000..1e294bc8e --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/properties_from_endpoint.py @@ -0,0 +1,40 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass +from typing import Any, Iterable, List, Mapping, Optional + +import dpath + +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString +from airbyte_cdk.sources.declarative.retrievers import Retriever +from airbyte_cdk.sources.types import Config, StreamSlice + + +@dataclass +class PropertiesFromEndpoint: + """ + Component that defines the behavior around how to dynamically retrieve a set of request properties from an + API endpoint. The set retrieved can then be injected into the requests to extract records from an API source. + """ + + property_field_path: List[str] + retriever: Retriever + config: Config + parameters: InitVar[Mapping[str, Any]] + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._property_field_path = [ + InterpolatedString(string=property_field, parameters=parameters) + for property_field in self.property_field_path + ] + + def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]: + response_properties = self.retriever.read_records( + records_schema={}, stream_slice=stream_slice + ) + for property_obj in response_properties: + path = [ + node.eval(self.config) if not isinstance(node, str) else node + for node in self._property_field_path + ] + yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py new file mode 100644 index 000000000..53f387775 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/property_chunking.py @@ -0,0 +1,69 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass +from enum import Enum +from typing import Any, Iterable, List, Mapping, Optional + +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( + RecordMergeStrategy, +) +from airbyte_cdk.sources.types import Config, Record + + +class PropertyLimitType(Enum): + """ + The heuristic that determines when the maximum size of the current chunk of properties and when a new + one should be started. + """ + + characters = "characters" + property_count = "property_count" + + +@dataclass +class PropertyChunking: + """ + Defines the behavior for how the complete list of properties to query for are broken down into smaller groups + that will be used for multiple requests to the target API. + """ + + property_limit_type: PropertyLimitType + property_limit: Optional[int] + record_merge_strategy: Optional[RecordMergeStrategy] + parameters: InitVar[Mapping[str, Any]] + config: Config + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._record_merge_strategy = self.record_merge_strategy or GroupByKey( + key="id", config=self.config, parameters=parameters + ) + + def get_request_property_chunks( + self, property_fields: Iterable[str], always_include_properties: Optional[List[str]] + ) -> Iterable[List[str]]: + if not self.property_limit: + single_property_chunk = list(property_fields) + if always_include_properties: + single_property_chunk.extend(always_include_properties) + yield single_property_chunk + return + current_chunk = list(always_include_properties) if always_include_properties else [] + chunk_size = 0 + for property_field in property_fields: + # If property_limit_type is not defined, we default to property_count which is just an incrementing count + property_field_size = ( + len(property_field) + if self.property_limit_type == PropertyLimitType.characters + else 1 + ) + if chunk_size + property_field_size > self.property_limit: + yield current_chunk + current_chunk = list(always_include_properties) if always_include_properties else [] + chunk_size = 0 + current_chunk.append(property_field) + chunk_size += property_field_size + yield current_chunk + + def get_merge_key(self, record: Record) -> Optional[str]: + return self._record_merge_strategy.get_group_key(record=record) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py new file mode 100644 index 000000000..4dd7bced8 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py @@ -0,0 +1,58 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass +from typing import Any, Iterable, List, Mapping, Optional, Union + +from airbyte_cdk.sources.declarative.requesters.query_properties import ( + PropertiesFromEndpoint, + PropertyChunking, +) +from airbyte_cdk.sources.types import Config, StreamSlice + + +@dataclass +class QueryProperties: + """ + Low-code component that encompasses the behavior to inject additional property values into the outbound API + requests. Property values can be defined statically within the manifest or dynamically by making requests + to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of + properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved + """ + + property_list: Optional[Union[List[str], PropertiesFromEndpoint]] + always_include_properties: Optional[List[str]] + property_chunking: Optional[PropertyChunking] + config: Config + parameters: InitVar[Mapping[str, Any]] + + def get_request_property_chunks( + self, stream_slice: Optional[StreamSlice] = None + ) -> Iterable[List[str]]: + """ + Uses the defined property_list to fetch the total set of properties dynamically or from a static list + and based on the resulting properties, performs property chunking if applicable. + :param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included + because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object + """ + fields: Union[Iterable[str], List[str]] + if isinstance(self.property_list, PropertiesFromEndpoint): + fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice) + else: + fields = self.property_list if self.property_list else [] + + if self.property_chunking: + yield from self.property_chunking.get_request_property_chunks( + property_fields=fields, always_include_properties=self.always_include_properties + ) + else: + yield list(fields) + + # delete later, but leaving this to keep the discussion thread on the PR from getting hidden + def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: + property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice)) + try: + next(property_chunks) + next(property_chunks) + return True + except StopIteration: + return False diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py new file mode 100644 index 000000000..34d0e4e2e --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/__init__.py @@ -0,0 +1,10 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.group_by_key import ( + GroupByKey, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( + RecordMergeStrategy, +) + +__all__ = ["GroupByKey", "RecordMergeStrategy"] diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py new file mode 100644 index 000000000..e470e5521 --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/group_by_key.py @@ -0,0 +1,33 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from dataclasses import InitVar, dataclass +from typing import Any, List, Mapping, Optional, Union + +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( + RecordMergeStrategy, +) +from airbyte_cdk.sources.types import Config, Record + + +@dataclass +class GroupByKey(RecordMergeStrategy): + """ + Record merge strategy that combines records together according to values on the record for one or many keys. + """ + + key: Union[str, List[str]] + parameters: InitVar[Mapping[str, Any]] + config: Config + + def __post_init__(self, parameters: Mapping[str, Any]) -> None: + self._keys = [self.key] if isinstance(self.key, str) else self.key + + def get_group_key(self, record: Record) -> Optional[str]: + resolved_keys = [] + for key in self._keys: + key_value = record.data.get(key) + if key_value: + resolved_keys.append(key_value) + else: + return None + return ",".join(resolved_keys) diff --git a/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py new file mode 100644 index 000000000..f77b5ba0c --- /dev/null +++ b/airbyte_cdk/sources/declarative/requesters/query_properties/strategies/merge_strategy.py @@ -0,0 +1,19 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Optional + +from airbyte_cdk.sources.types import Record + + +@dataclass +class RecordMergeStrategy(ABC): + """ + Describe the interface for how records that required multiple requests to get the complete set of fields + should be merged back into a single record. + """ + + @abstractmethod + def get_group_key(self, record: Record) -> Optional[str]: + pass diff --git a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py index e14c64de0..2e0038730 100644 --- a/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py +++ b/airbyte_cdk/sources/declarative/requesters/request_options/interpolated_request_options_provider.py @@ -1,9 +1,9 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # from dataclasses import InitVar, dataclass, field -from typing import Any, Mapping, MutableMapping, Optional, Union +from typing import Any, List, Mapping, MutableMapping, Optional, Union from airbyte_cdk.sources.declarative.interpolation.interpolated_nested_mapping import NestedMapping from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_nested_request_input_provider import ( @@ -40,6 +40,7 @@ class InterpolatedRequestOptionsProvider(RequestOptionsProvider): request_headers: Optional[RequestInput] = None request_body_data: Optional[RequestInput] = None request_body_json: Optional[NestedMapping] = None + query_properties_key: Optional[str] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: if self.request_parameters is None: @@ -83,6 +84,28 @@ def get_request_params( valid_value_types=ValidRequestTypes, ) if isinstance(interpolated_value, dict): + if self.query_properties_key: + if not stream_slice: + raise ValueError( + "stream_slice should not be None if query properties in requests is enabled. Please contact Airbyte Support" + ) + elif ( + "query_properties" not in stream_slice.extra_fields + or stream_slice.extra_fields.get("query_properties") is None + ): + raise ValueError( + "QueryProperties component is defined but stream_partition does not contain query_properties. Please contact Airbyte Support" + ) + elif not isinstance(stream_slice.extra_fields.get("query_properties"), List): + raise ValueError( + "QueryProperties component is defined but stream_slice.extra_fields.query_properties is not a List. Please contact Airbyte Support" + ) + interpolated_value = { + **interpolated_value, + self.query_properties_key: ",".join( + stream_slice.extra_fields.get("query_properties") # type: ignore # Earlier type checks validate query_properties type + ), + } return interpolated_value return {} diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 65aa5d406..b339aaedf 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -1,8 +1,9 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import json +from collections import defaultdict from dataclasses import InitVar, dataclass, field from functools import partial from itertools import islice @@ -12,6 +13,7 @@ Iterable, List, Mapping, + MutableMapping, Optional, Set, Tuple, @@ -31,6 +33,7 @@ ) from airbyte_cdk.sources.declarative.requesters.paginators.no_pagination import NoPagination from airbyte_cdk.sources.declarative.requesters.paginators.paginator import Paginator +from airbyte_cdk.sources.declarative.requesters.query_properties import QueryProperties from airbyte_cdk.sources.declarative.requesters.request_options import ( DefaultRequestOptionsProvider, RequestOptionsProvider, @@ -88,6 +91,7 @@ class SimpleRetriever(Retriever): ) cursor: Optional[DeclarativeCursor] = None ignore_stream_slicer_parameters_on_paginated_requests: bool = False + additional_query_properties: Optional[QueryProperties] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._paginator = self.paginator or NoPagination(parameters=parameters) @@ -445,43 +449,110 @@ def read_records( :param stream_slice: The stream slice to read data for :return: The records read from the API source """ - _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check - most_recent_record_from_slice = None - record_generator = partial( - self._parse_records, - stream_slice=stream_slice, - stream_state=self.state or {}, - records_schema=records_schema, + property_chunks = ( + list( + self.additional_query_properties.get_request_property_chunks( + stream_slice=stream_slice + ) + ) + if self.additional_query_properties + else [] ) + records_without_merge_key = [] + merged_records: MutableMapping[str, Any] = defaultdict(dict) - if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): - stream_state = self.state - - # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to - # fetch more records. The platform deletes stream state for full refresh streams before starting a - # new job, so we don't need to worry about this value existing for the initial attempt - if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): - return + _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check + most_recent_record_from_slice = None - yield from self._read_single_page(record_generator, stream_state, _slice) - else: - for stream_data in self._read_pages(record_generator, self.state, _slice): - current_record = self._extract_record(stream_data, _slice) - if self.cursor and current_record: - self.cursor.observe(_slice, current_record) - - # Latest record read, not necessarily within slice boundaries. - # TODO Remove once all custom components implement `observe` method. - # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 - most_recent_record_from_slice = self._get_most_recent_record( - most_recent_record_from_slice, current_record, _slice + if self.additional_query_properties: + for properties in property_chunks: + _slice = StreamSlice( + partition=_slice.partition or {}, + cursor_slice=_slice.cursor_slice or {}, + extra_fields={"query_properties": properties}, + ) # None-check + + record_generator = partial( + self._parse_records, + stream_slice=_slice, + stream_state=self.state or {}, + records_schema=records_schema, ) - yield stream_data + for stream_data in self._read_pages(record_generator, self.state, _slice): + current_record = self._extract_record(stream_data, _slice) + if self.cursor and current_record: + self.cursor.observe(_slice, current_record) + + # Latest record read, not necessarily within slice boundaries. + # TODO Remove once all custom components implement `observe` method. + # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 + most_recent_record_from_slice = self._get_most_recent_record( + most_recent_record_from_slice, current_record, _slice + ) + + if current_record and self.additional_query_properties.property_chunking: + merge_key = ( + self.additional_query_properties.property_chunking.get_merge_key( + current_record + ) + ) + if merge_key: + merged_records[merge_key].update(current_record) + else: + # We should still emit records even if the record did not have a merge key + records_without_merge_key.append(current_record) + else: + yield stream_data if self.cursor: self.cursor.close_slice(_slice, most_recent_record_from_slice) - return + + if len(merged_records) > 0: + yield from [ + Record(data=merged_record, stream_name=self.name, associated_slice=stream_slice) + for merged_record in merged_records.values() + ] + if len(records_without_merge_key) > 0: + yield from records_without_merge_key + else: + _slice = stream_slice or StreamSlice(partition={}, cursor_slice={}) # None-check + + most_recent_record_from_slice = None + record_generator = partial( + self._parse_records, + stream_slice=stream_slice, + stream_state=self.state or {}, + records_schema=records_schema, + ) + + if self.cursor and isinstance(self.cursor, ResumableFullRefreshCursor): + stream_state = self.state + + # Before syncing the RFR stream, we check if the job's prior attempt was successful and don't need to + # fetch more records. The platform deletes stream state for full refresh streams before starting a + # new job, so we don't need to worry about this value existing for the initial attempt + if stream_state.get(FULL_REFRESH_SYNC_COMPLETE_KEY): + return + + yield from self._read_single_page(record_generator, stream_state, _slice) + else: + for stream_data in self._read_pages(record_generator, self.state, _slice): + current_record = self._extract_record(stream_data, _slice) + if self.cursor and current_record: + self.cursor.observe(_slice, current_record) + + # Latest record read, not necessarily within slice boundaries. + # TODO Remove once all custom components implement `observe` method. + # https://github.com/airbytehq/airbyte-internal-issues/issues/6955 + most_recent_record_from_slice = self._get_most_recent_record( + most_recent_record_from_slice, current_record, _slice + ) + yield stream_data + + if self.cursor: + self.cursor.close_slice(_slice, most_recent_record_from_slice) + return def _get_most_recent_record( self, diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 9d462f330..f628eeb3b 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -9,6 +9,7 @@ import freezegun import pytest import requests +from pydantic.v1 import ValidationError from airbyte_cdk import AirbyteTracedException from airbyte_cdk.models import ( @@ -72,6 +73,7 @@ from airbyte_cdk.sources.declarative.models import JwtAuthenticator as JwtAuthenticatorModel from airbyte_cdk.sources.declarative.models import ListPartitionRouter as ListPartitionRouterModel from airbyte_cdk.sources.declarative.models import OAuthAuthenticator as OAuthAuthenticatorModel +from airbyte_cdk.sources.declarative.models import PropertyChunking as PropertyChunkingModel from airbyte_cdk.sources.declarative.models import RecordSelector as RecordSelectorModel from airbyte_cdk.sources.declarative.models import SimpleRetriever as SimpleRetrieverModel from airbyte_cdk.sources.declarative.models import Spec as SpecModel @@ -124,6 +126,15 @@ PageIncrement, StopConditionPaginationStrategyDecorator, ) +from airbyte_cdk.sources.declarative.requesters.query_properties import ( + PropertiesFromEndpoint, + PropertyChunking, + QueryProperties, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyLimitType, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey from airbyte_cdk.sources.declarative.requesters.request_option import ( RequestOption, RequestOptionType, @@ -151,9 +162,7 @@ ClampingEndProvider, DayClampingStrategy, MonthClampingStrategy, - NoClamping, WeekClampingStrategy, - Weekday, ) from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( @@ -3997,3 +4006,377 @@ def test_create_grouping_partition_router_substream_with_request_option(): component_definition=partition_router_manifest, config=input_config, ) + + +def test_simple_retriever_with_query_properties(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.linkedin.com/rest/" + http_method: "GET" + path: "adAnalytics" + request_parameters: + nonary: "{{config['nonary'] }}" + fields: + type: QueryProperties + property_list: + - first_name + - last_name + - status + - organization + - created_at + always_include_properties: + - id + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + analytics_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "analytics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["analytics_stream"], {} + ) + + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config + ) + + query_properties = stream.retriever.additional_query_properties + assert isinstance(query_properties, QueryProperties) + assert query_properties.property_list == [ + "first_name", + "last_name", + "status", + "organization", + "created_at", + ] + assert query_properties.always_include_properties == ["id"] + + property_chunking = stream.retriever.additional_query_properties.property_chunking + assert isinstance(property_chunking, PropertyChunking) + assert property_chunking.property_limit_type == PropertyLimitType.property_count + assert property_chunking.property_limit == 3 + + merge_strategy = ( + stream.retriever.additional_query_properties.property_chunking.record_merge_strategy + ) + assert isinstance(merge_strategy, GroupByKey) + assert merge_strategy.key == ["id"] + + request_options_provider = stream.retriever.requester.request_options_provider + assert isinstance(request_options_provider, InterpolatedRequestOptionsProvider) + # For a better developer experience we allow QueryProperties to be defined on the requester.request_parameters, + # but it actually is leveraged by the SimpleRetriever which is why it is not included in the RequestOptionsProvider + assert request_options_provider.query_properties_key == "fields" + assert "fields" not in request_options_provider.request_parameters + assert request_options_provider.request_parameters.get("nonary") == "{{config['nonary'] }}" + + +def test_simple_retriever_with_properties_from_endpoint(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.hubapi.com" + http_method: "GET" + path: "adAnalytics" + request_parameters: + nonary: "{{config['nonary'] }}" + fields: + type: QueryProperties + property_list: + type: PropertiesFromEndpoint + property_field_path: [ "name" ] + retriever: + type: SimpleRetriever + requester: + type: HttpRequester + url_base: https://api.hubapi.com + path: "/properties/v2/dynamics/properties" + http_method: GET + record_selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: [] + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + dynamic_properties_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "dynamics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["dynamic_properties_stream"], {} + ) + + stream = factory.create_component( + model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config + ) + + query_properties = stream.retriever.additional_query_properties + assert isinstance(query_properties, QueryProperties) + assert query_properties.always_include_properties is None + + properties_from_endpoint = stream.retriever.additional_query_properties.property_list + assert isinstance(properties_from_endpoint, PropertiesFromEndpoint) + assert properties_from_endpoint.property_field_path == ["name"] + + properties_from_endpoint_retriever = ( + stream.retriever.additional_query_properties.property_list.retriever + ) + assert isinstance(properties_from_endpoint_retriever, SimpleRetriever) + + properties_from_endpoint_requester = ( + stream.retriever.additional_query_properties.property_list.retriever.requester + ) + assert isinstance(properties_from_endpoint_requester, HttpRequester) + assert properties_from_endpoint_requester.url_base == "https://api.hubapi.com" + assert properties_from_endpoint_requester.path == "/properties/v2/dynamics/properties" + + property_chunking = stream.retriever.additional_query_properties.property_chunking + assert isinstance(property_chunking, PropertyChunking) + assert property_chunking.property_limit_type == PropertyLimitType.property_count + assert property_chunking.property_limit == 3 + + +def test_request_parameters_raise_error_if_not_of_type_query_properties(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.linkedin.com/rest/" + http_method: "GET" + path: "adAnalytics" + request_parameters: + nonary: "{{config['nonary'] }}" + fields: + type: ListPartitionRouter + values: "{{config['repos']}}" + cursor_field: repository + request_option: + type: RequestOption + inject_into: body_json + field_path: ["repository", "id"] + analytics_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "analytics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["analytics_stream"], {} + ) + + with pytest.raises(ValueError): + factory.create_component( + model_type=DeclarativeStreamModel, + component_definition=stream_manifest, + config=input_config, + ) + + +def test_create_simple_retriever_raise_error_if_multiple_request_properties(): + content = """ + selector: + type: RecordSelector + extractor: + type: DpathExtractor + field_path: ["extractor_path"] + record_filter: + type: RecordFilter + condition: "{{ record['id'] > stream_state['id'] }}" + requester: + type: HttpRequester + name: "{{ parameters['name'] }}" + url_base: "https://api.linkedin.com/rest/" + http_method: "GET" + path: "adAnalytics" + request_parameters: + first_query_properties: + type: QueryProperties + property_list: + - first_name + - last_name + - status + - organization + - created_at + always_include_properties: + - id + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + nonary: "{{config['nonary'] }}" + invalid_extra_query_properties: + type: QueryProperties + property_list: + - first_name + - last_name + - status + - organization + - created_at + always_include_properties: + - id + property_chunking: + type: PropertyChunking + property_limit_type: property_count + property_limit: 3 + record_merge_strategy: + type: GroupByKeyMergeStrategy + key: ["id"] + analytics_stream: + type: DeclarativeStream + incremental_sync: + type: DatetimeBasedCursor + $parameters: + datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z" + start_datetime: "{{ config['start_time'] }}" + cursor_field: "created" + retriever: + type: SimpleRetriever + name: "{{ parameters['name'] }}" + requester: + $ref: "#/requester" + record_selector: + $ref: "#/selector" + $parameters: + name: "analytics" + """ + + parsed_manifest = YamlDeclarativeSource._parse(content) + resolved_manifest = resolver.preprocess_manifest(parsed_manifest) + stream_manifest = transformer.propagate_types_and_parameters( + "", resolved_manifest["analytics_stream"], {} + ) + + with pytest.raises(ValueError): + factory.create_component( + model_type=DeclarativeStreamModel, + component_definition=stream_manifest, + config=input_config, + ) + + +def test_create_property_chunking_characters(): + property_chunking_model = { + "type": "PropertyChunking", + "property_limit_type": "characters", + "property_limit": 100, + "record_merge_strategy": {"type": "GroupByKeyMergeStrategy", "key": ["id"]}, + } + + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + property_chunking = connector_builder_factory.create_component( + model_type=PropertyChunkingModel, + component_definition=property_chunking_model, + config={}, + ) + + assert isinstance(property_chunking, PropertyChunking) + assert property_chunking.property_limit_type == PropertyLimitType.characters + assert property_chunking.property_limit == 100 + + +def test_create_property_chunking_invalid_property_limit_type(): + property_chunking_model = { + "type": "PropertyChunking", + "property_limit_type": "nope", + "property_limit": 20, + "record_merge_strategy": {"type": "GroupByKeyMergeStrategy", "key": ["id"]}, + } + + connector_builder_factory = ModelToComponentFactory(emit_connector_builder_messages=True) + + with pytest.raises(ValidationError): + connector_builder_factory.create_component( + model_type=PropertyChunkingModel, + component_definition=property_chunking_model, + config={}, + ) diff --git a/unit_tests/sources/declarative/requesters/query_properties/__init__.py b/unit_tests/sources/declarative/requesters/query_properties/__init__.py new file mode 100644 index 000000000..58b636bf9 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/__init__.py @@ -0,0 +1 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py b/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py new file mode 100644 index 000000000..a2142cc1b --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/test_group_by_key.py @@ -0,0 +1,41 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +import pytest + +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey +from airbyte_cdk.sources.types import Record + + +@pytest.mark.parametrize( + "key,record,expected_merge_key", + [ + pytest.param( + ["id"], + Record( + stream_name="test", + data={"id": "0", "first_name": "Belinda", "last_name": "Lindsey"}, + ), + "0", + id="test_get_merge_key_single", + ), + pytest.param( + ["last_name", "first_name"], + Record( + stream_name="test", data={"id": "1", "first_name": "Zion", "last_name": "Lindsey"} + ), + "Lindsey,Zion", + id="test_get_merge_key_single_multiple", + ), + pytest.param( + [""], + Record(stream_name="test", data={}), + None, + id="test_get_merge_key_not_present", + ), + ], +) +def test_get_merge_key(key, record, expected_merge_key): + group_by_key = GroupByKey(key=key, config={}, parameters={}) + + merge_key = group_by_key.get_group_key(record=record) + assert merge_key == expected_merge_key diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py b/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py new file mode 100644 index 000000000..f94925d7d --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/test_properties_from_endpoint.py @@ -0,0 +1,145 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from unittest.mock import Mock + +from airbyte_cdk.sources.declarative.requesters.query_properties import PropertiesFromEndpoint +from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever +from airbyte_cdk.sources.types import Record, StreamSlice + +CONFIG = {} + + +def test_get_properties_from_endpoint(): + expected_properties = [ + "gentarou", + "light", + "aoi", + "clover", + "junpei", + "akane", + "unknown", + "hazuki", + "teruaki", + ] + + retriever = Mock(spec=SimpleRetriever) + retriever.read_records.return_value = iter( + [ + Record(stream_name="players", data={"id": "ace", "name": "gentarou"}), + Record(stream_name="players", data={"id": "snake", "name": "light"}), + Record(stream_name="players", data={"id": "santa", "name": "aoi"}), + Record(stream_name="players", data={"id": "clover", "name": "clover"}), + Record(stream_name="players", data={"id": "junpei", "name": "junpei"}), + Record(stream_name="players", data={"id": "june", "name": "akane"}), + Record(stream_name="players", data={"id": "seven", "name": "unknown"}), + Record(stream_name="players", data={"id": "lotus", "name": "hazuki"}), + Record(stream_name="players", data={"id": "nine", "name": "teruaki"}), + ] + ) + + properties_from_endpoint = PropertiesFromEndpoint( + retriever=retriever, + property_field_path=["name"], + config=CONFIG, + parameters={}, + ) + + properties = list( + properties_from_endpoint.get_properties_from_endpoint( + stream_slice=StreamSlice(cursor_slice={}, partition={}) + ) + ) + + assert len(properties) == 9 + assert properties == expected_properties + + +def test_get_properties_from_endpoint_with_multiple_field_paths(): + expected_properties = [ + "gentarou", + "light", + "aoi", + "clover", + "junpei", + "akane", + "unknown", + "hazuki", + "teruaki", + ] + + retriever = Mock(spec=SimpleRetriever) + retriever.read_records.return_value = iter( + [ + Record(stream_name="players", data={"id": "ace", "names": {"first_name": "gentarou"}}), + Record(stream_name="players", data={"id": "snake", "names": {"first_name": "light"}}), + Record(stream_name="players", data={"id": "santa", "names": {"first_name": "aoi"}}), + Record(stream_name="players", data={"id": "clover", "names": {"first_name": "clover"}}), + Record(stream_name="players", data={"id": "junpei", "names": {"first_name": "junpei"}}), + Record(stream_name="players", data={"id": "june", "names": {"first_name": "akane"}}), + Record(stream_name="players", data={"id": "seven", "names": {"first_name": "unknown"}}), + Record(stream_name="players", data={"id": "lotus", "names": {"first_name": "hazuki"}}), + Record(stream_name="players", data={"id": "nine", "names": {"first_name": "teruaki"}}), + ] + ) + + properties_from_endpoint = PropertiesFromEndpoint( + retriever=retriever, + property_field_path=["names", "first_name"], + config=CONFIG, + parameters={}, + ) + + properties = list( + properties_from_endpoint.get_properties_from_endpoint( + stream_slice=StreamSlice(cursor_slice={}, partition={}) + ) + ) + + assert len(properties) == 9 + assert properties == expected_properties + + +def test_get_properties_from_endpoint_with_interpolation(): + config = {"top_level_field": "names"} + expected_properties = [ + "gentarou", + "light", + "aoi", + "clover", + "junpei", + "akane", + "unknown", + "hazuki", + "teruaki", + ] + + retriever = Mock(spec=SimpleRetriever) + retriever.read_records.return_value = iter( + [ + Record(stream_name="players", data={"id": "ace", "names": {"first_name": "gentarou"}}), + Record(stream_name="players", data={"id": "snake", "names": {"first_name": "light"}}), + Record(stream_name="players", data={"id": "santa", "names": {"first_name": "aoi"}}), + Record(stream_name="players", data={"id": "clover", "names": {"first_name": "clover"}}), + Record(stream_name="players", data={"id": "junpei", "names": {"first_name": "junpei"}}), + Record(stream_name="players", data={"id": "june", "names": {"first_name": "akane"}}), + Record(stream_name="players", data={"id": "seven", "names": {"first_name": "unknown"}}), + Record(stream_name="players", data={"id": "lotus", "names": {"first_name": "hazuki"}}), + Record(stream_name="players", data={"id": "nine", "names": {"first_name": "teruaki"}}), + ] + ) + + properties_from_endpoint = PropertiesFromEndpoint( + retriever=retriever, + property_field_path=["{{ config['top_level_field'] }}", "first_name"], + config=config, + parameters={}, + ) + + properties = list( + properties_from_endpoint.get_properties_from_endpoint( + stream_slice=StreamSlice(cursor_slice={}, partition={}) + ) + ) + + assert len(properties) == 9 + assert properties == expected_properties diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py new file mode 100644 index 000000000..d05c66df6 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/test_property_chunking.py @@ -0,0 +1,98 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +import pytest + +from airbyte_cdk.sources.declarative.requesters.query_properties import PropertyChunking +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyLimitType, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey +from airbyte_cdk.sources.types import Record + +CONFIG = {} + + +@pytest.mark.parametrize( + "property_fields,always_include_properties,property_limit_type,property_limit,expected_property_chunks", + [ + pytest.param( + ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"], + None, + PropertyLimitType.property_count, + 2, + [["rick", "chelsea"], ["victoria", "tim"], ["saxon", "lochlan"], ["piper"]], + id="test_property_chunking", + ), + pytest.param( + ["rick", "chelsea", "victoria", "tim"], + ["mook", "gaitok"], + PropertyLimitType.property_count, + 2, + [["mook", "gaitok", "rick", "chelsea"], ["mook", "gaitok", "victoria", "tim"]], + id="test_property_chunking_with_always_include_fields", + ), + pytest.param( + ["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"], + None, + PropertyLimitType.property_count, + None, + [["rick", "chelsea", "victoria", "tim", "saxon", "lochlan", "piper"]], + id="test_property_chunking_no_limit", + ), + pytest.param( + ["kate", "laurie", "jaclyn"], + None, + PropertyLimitType.characters, + 10, + [["kate", "laurie"], ["jaclyn"]], + id="test_property_chunking_limit_characters", + ), + pytest.param( + [], + None, + PropertyLimitType.property_count, + 5, + [[]], + id="test_property_chunking_no_properties", + ), + ], +) +def test_get_request_property_chunks( + property_fields, + always_include_properties, + property_limit_type, + property_limit, + expected_property_chunks, +): + property_fields = iter(property_fields) + property_chunking = PropertyChunking( + property_limit_type=property_limit_type, + property_limit=property_limit, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ) + + property_chunks = list( + property_chunking.get_request_property_chunks( + property_fields=property_fields, always_include_properties=always_include_properties + ) + ) + + assert len(property_chunks) == len(expected_property_chunks) + for i, expected_property_chunk in enumerate(expected_property_chunks): + assert property_chunks[i] == expected_property_chunk + + +def test_get_merge_key(): + record = Record(stream_name="test", data={"id": "0"}) + property_chunking = PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=10, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ) + + merge_key = property_chunking.get_merge_key(record=record) + assert merge_key == "0" diff --git a/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py new file mode 100644 index 000000000..e67383951 --- /dev/null +++ b/unit_tests/sources/declarative/requesters/query_properties/test_query_properties.py @@ -0,0 +1,115 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from unittest.mock import Mock + +from airbyte_cdk.sources.declarative.requesters.query_properties import ( + PropertiesFromEndpoint, + QueryProperties, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + PropertyChunking, + PropertyLimitType, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey +from airbyte_cdk.sources.types import StreamSlice + +CONFIG = {} + + +def test_get_request_property_chunks_static_list_with_chunking(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 3 + assert property_chunks[0] == ["ace", "snake", "santa"] + assert property_chunks[1] == ["clover", "junpei", "june"] + assert property_chunks[2] == ["seven", "lotus", "nine"] + + +def test_get_request_property_chunks_static_list_with_always_include_properties(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + query_properties = QueryProperties( + property_list=[ + "ace", + "snake", + "santa", + "clover", + "junpei", + "june", + "seven", + "lotus", + "nine", + ], + always_include_properties=["zero"], + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=3, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 3 + assert property_chunks[0] == ["zero", "ace", "snake", "santa"] + assert property_chunks[1] == ["zero", "clover", "junpei", "june"] + assert property_chunks[2] == ["zero", "seven", "lotus", "nine"] + + +def test_get_request_property_chunks_dynamic_endpoint(): + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + properties_from_endpoint_mock = Mock(spec=PropertiesFromEndpoint) + properties_from_endpoint_mock.get_properties_from_endpoint.return_value = iter( + ["alice", "clover", "dio", "k", "luna", "phi", "quark", "sigma", "tenmyouji"] + ) + + query_properties = QueryProperties( + property_list=properties_from_endpoint_mock, + always_include_properties=None, + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=5, + record_merge_strategy=GroupByKey(key="id", config=CONFIG, parameters={}), + config=CONFIG, + parameters={}, + ), + config=CONFIG, + parameters={}, + ) + + property_chunks = list(query_properties.get_request_property_chunks(stream_slice=stream_slice)) + + assert len(property_chunks) == 2 + assert property_chunks[0] == ["alice", "clover", "dio", "k", "luna"] + assert property_chunks[1] == ["phi", "quark", "sigma", "tenmyouji"] diff --git a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py index 2c646cf43..c116511a3 100644 --- a/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py +++ b/unit_tests/sources/declarative/requesters/request_options/test_interpolated_request_options_provider.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import pytest @@ -7,6 +7,7 @@ from airbyte_cdk.sources.declarative.requesters.request_options.interpolated_request_options_provider import ( InterpolatedRequestOptionsProvider, ) +from airbyte_cdk.sources.types import StreamSlice state = {"date": "2021-01-01"} stream_slice = {"start_date": "2020-01-01"} @@ -178,3 +179,54 @@ def test_error_on_create_for_both_request_json_and_data(): request_body_data=request_data, parameters={}, ) + + +@pytest.mark.parametrize( + "incoming_stream_slice,expected_query_params,expected_error", + [ + pytest.param( + StreamSlice( + cursor_slice={}, partition={}, extra_fields={"query_properties": ["id", "name"]} + ), + {"predicate": "OPTION", "properties": "id,name"}, + None, + id="test_include_query_properties", + ), + pytest.param(None, None, ValueError, id="test_raise_error_on_no_stream_slice"), + pytest.param( + StreamSlice(cursor_slice={}, partition={}, extra_fields={}), + None, + ValueError, + id="test_raise_error_on_no_query_properties", + ), + pytest.param( + StreamSlice(cursor_slice={}, partition={}, extra_fields={"query_properties": None}), + None, + ValueError, + id="test_raise_error_on_query_properties_is_none", + ), + pytest.param( + StreamSlice(cursor_slice={}, partition={}, extra_fields={"query_properties": 404}), + None, + ValueError, + id="test_raise_error_on_query_properties_is_not_a_list_of_properties", + ), + ], +) +def test_property_error_on_invalid_stream_slice( + incoming_stream_slice, expected_query_params, expected_error +): + request_options_provider = InterpolatedRequestOptionsProvider( + request_parameters={"predicate": "{{ config['option'] }}"}, + query_properties_key="properties", + config=config, + parameters={}, + ) + if expected_error: + with pytest.raises(expected_error): + request_options_provider.get_request_params(stream_slice=incoming_stream_slice) + else: + request_parameters = request_options_provider.get_request_params( + stream_slice=incoming_stream_slice + ) + assert request_parameters == expected_query_params diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index 0b5778b7b..442f8fc22 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. # import json @@ -30,6 +30,14 @@ CursorPaginationStrategy, PageIncrement, ) +from airbyte_cdk.sources.declarative.requesters.query_properties import ( + PropertyChunking, + QueryProperties, +) +from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( + GroupByKey, + PropertyLimitType, +) from airbyte_cdk.sources.declarative.requesters.request_option import RequestOptionType from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod from airbyte_cdk.sources.declarative.retrievers.simple_retriever import ( @@ -992,7 +1000,7 @@ def test_retriever_is_stateless(): {"id": "0", "first_name": "eric", "last_name": "tao"}, {"id": "1", "first_name": "rishi", "last_name": "ramdani"}, {"id": "2", "first_name": "harper", "last_name": "stern"}, - {"id": "3", "first_name": "erobertric", "last_name": "spearing"}, + {"id": "3", "first_name": "robert", "last_name": "spearing"}, {"id": "4", "first_name": "yasmin", "last_name": "kara-hanani"}, ] } @@ -1092,3 +1100,503 @@ def mock_send_request( assert actual_records[5] == Record( data={"id": "5", "first_name": "daria", "last_name": "greenock"}, stream_name="employees" ) + + +def test_simple_retriever_with_additional_query_properties(): + stream_name = "stream_name" + expected_records = [ + Record( + { + "id": "a", + "first_name": "gentarou", + "last_name": "hongou", + "nonary": "second", + "bracelet": "1", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "b", + "first_name": "clover", + "last_name": "field", + "nonary": "ambidex", + "bracelet": "green", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "c", + "first_name": "akane", + "last_name": "kurashiki", + "nonary": "second", + "bracelet": "6", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "d", + "first_name": "sigma", + "last_name": "klim", + "nonary": "ambidex", + "bracelet": "red", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "e", + "first_name": "light", + "last_name": "field", + "nonary": "second", + "bracelet": "2", + }, + associated_slice=None, + stream_name=stream_name, + ), + ] + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"data": [record.data for record in expected_records]}).encode( + "utf-8" + ) + + requester = MagicMock() + requester.send_request.side_effect = [ + response, + response, + ] + + record_selector = MagicMock() + record_selector.select_records.side_effect = [ + [ + Record( + data={"id": "a", "first_name": "gentarou", "last_name": "hongou"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "first_name": "clover", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "first_name": "akane", "last_name": "kurashiki"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "first_name": "sigma", "last_name": "klim"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "e", "first_name": "light", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + [ + Record( + data={"id": "e", "nonary": "second", "bracelet": "2"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "nonary": "ambidex", "bracelet": "red"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "nonary": "second", "bracelet": "6"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "nonary": "ambidex", "bracelet": "green"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "a", "nonary": "second", "bracelet": "1"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + ] + + query_properties = QueryProperties( + property_list=["first_name", "last_name", "nonary", "bracelet"], + always_include_properties=[], + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=2, + record_merge_strategy=GroupByKey(key="id", config=config, parameters={}), + config=config, + parameters={}, + ), + config=config, + parameters={}, + ) + + retriever = SimpleRetriever( + name=stream_name, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + additional_query_properties=query_properties, + parameters={}, + config={}, + ) + + actual_records = [ + r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) + ] + + assert len(actual_records) == 5 + assert actual_records == expected_records + + +def test_simple_retriever_with_additional_query_properties_single_chunk(): + stream_name = "stream_name" + expected_records = [ + Record( + { + "id": "a", + "first_name": "gentarou", + "last_name": "hongou", + "nonary": "second", + "bracelet": "1", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "b", + "first_name": "clover", + "last_name": "field", + "nonary": "ambidex", + "bracelet": "green", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "c", + "first_name": "akane", + "last_name": "kurashiki", + "nonary": "second", + "bracelet": "6", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "d", + "first_name": "sigma", + "last_name": "klim", + "nonary": "ambidex", + "bracelet": "red", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + { + "id": "e", + "first_name": "light", + "last_name": "field", + "nonary": "second", + "bracelet": "2", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + {"id": "f", "first_name": "carlos", "nonary": "decision", "bracelet": "c"}, + associated_slice=None, + stream_name=stream_name, + ), + ] + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"data": [record.data for record in expected_records]}).encode( + "utf-8" + ) + + requester = MagicMock() + requester.send_request.side_effect = [ + response, + response, + ] + + record_selector = MagicMock() + record_selector.select_records.side_effect = [ + [ + Record( + data={ + "id": "a", + "first_name": "gentarou", + "last_name": "hongou", + "nonary": "second", + "bracelet": "1", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={ + "id": "b", + "first_name": "clover", + "last_name": "field", + "nonary": "ambidex", + "bracelet": "green", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={ + "id": "c", + "first_name": "akane", + "last_name": "kurashiki", + "nonary": "second", + "bracelet": "6", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={ + "id": "d", + "first_name": "sigma", + "last_name": "klim", + "nonary": "ambidex", + "bracelet": "red", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={ + "id": "e", + "first_name": "light", + "last_name": "field", + "nonary": "second", + "bracelet": "2", + }, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "f", "first_name": "carlos", "nonary": "decision", "bracelet": "c"}, + associated_slice=None, + stream_name=stream_name, + ), + ] + ] + + query_properties = QueryProperties( + property_list=["first_name", "last_name", "nonary", "bracelet"], + always_include_properties=[], + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=10, + record_merge_strategy=GroupByKey(key="id", config=config, parameters={}), + config=config, + parameters={}, + ), + config=config, + parameters={}, + ) + + retriever = SimpleRetriever( + name=stream_name, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + additional_query_properties=query_properties, + parameters={}, + config={}, + ) + + actual_records = [ + r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) + ] + + assert len(actual_records) == 6 + assert actual_records == expected_records + + +def test_simple_retriever_still_emit_records_if_no_merge_key(): + stream_name = "stream_name" + expected_records = [ + Record( + data={"id": "a", "first_name": "gentarou", "last_name": "hongou"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "first_name": "clover", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "first_name": "akane", "last_name": "kurashiki"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "first_name": "sigma", "last_name": "klim"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "e", "first_name": "light", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "e", "nonary": "second", "bracelet": "2"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "nonary": "ambidex", "bracelet": "red"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "nonary": "second", "bracelet": "6"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "nonary": "ambidex", "bracelet": "green"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "a", "nonary": "second", "bracelet": "1"}, + associated_slice=None, + stream_name=stream_name, + ), + ] + + stream_slice = StreamSlice(cursor_slice={}, partition={}) + + response = requests.Response() + response.status_code = 200 + response._content = json.dumps({"data": [record.data for record in expected_records]}).encode( + "utf-8" + ) + + requester = MagicMock() + requester.send_request.side_effect = [ + response, + response, + ] + + record_selector = MagicMock() + record_selector.select_records.side_effect = [ + [ + Record( + data={"id": "a", "first_name": "gentarou", "last_name": "hongou"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "first_name": "clover", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "first_name": "akane", "last_name": "kurashiki"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "first_name": "sigma", "last_name": "klim"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "e", "first_name": "light", "last_name": "field"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + [ + Record( + data={"id": "e", "nonary": "second", "bracelet": "2"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "d", "nonary": "ambidex", "bracelet": "red"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "c", "nonary": "second", "bracelet": "6"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "b", "nonary": "ambidex", "bracelet": "green"}, + associated_slice=None, + stream_name=stream_name, + ), + Record( + data={"id": "a", "nonary": "second", "bracelet": "1"}, + associated_slice=None, + stream_name=stream_name, + ), + ], + ] + + query_properties = QueryProperties( + property_list=["first_name", "last_name", "nonary", "bracelet"], + always_include_properties=[], + property_chunking=PropertyChunking( + property_limit_type=PropertyLimitType.property_count, + property_limit=2, + record_merge_strategy=GroupByKey(key="not_real", config=config, parameters={}), + config=config, + parameters={}, + ), + config=config, + parameters={}, + ) + + retriever = SimpleRetriever( + name=stream_name, + primary_key=primary_key, + requester=requester, + record_selector=record_selector, + additional_query_properties=query_properties, + parameters={}, + config={}, + ) + + actual_records = [ + r for r in retriever.read_records(records_schema={}, stream_slice=stream_slice) + ] + + assert len(actual_records) == 10 + assert actual_records == expected_records diff --git a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py index 4d9af8667..d69ca335f 100644 --- a/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py +++ b/unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py @@ -11,6 +11,9 @@ from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) +from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( + ModelToComponentFactory, +) from airbyte_cdk.sources.declarative.schema import DynamicSchemaLoader, SchemaTypeIdentifier from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse @@ -347,7 +350,13 @@ def test_dynamic_schema_loader_with_type_conditions(): }, } source = ConcurrentDeclarativeSource( - source_config=_MANIFEST_WITH_TYPE_CONDITIONS, config=_CONFIG, catalog=None, state=None + source_config=_MANIFEST_WITH_TYPE_CONDITIONS, + config=_CONFIG, + catalog=None, + state=None, + component_factory=ModelToComponentFactory( + disable_cache=True + ), # Avoid caching on the HttpClient which could result in caching the requests/responses of other tests ) with HttpMocker() as http_mocker: http_mocker.get( diff --git a/unit_tests/sources/declarative/test_manifest_declarative_source.py b/unit_tests/sources/declarative/test_manifest_declarative_source.py index 38d6874c0..519bd0955 100644 --- a/unit_tests/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/sources/declarative/test_manifest_declarative_source.py @@ -627,7 +627,6 @@ def test_source_missing_checker_fails_validation(self): }, } ], - "check": {"type": "CheckStream"}, } with pytest.raises(ValidationError): ManifestDeclarativeSource(source_config=manifest)