diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index ca595d92e..3c877c446 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1741,6 +1741,28 @@ definitions: $parameters: type: object additionalProperties: true + CombinedExtractor: + title: Combined Extractor + description: Record extractor that merges the output of multiple sub-extractors into a single record. + type: object + required: + - type + - extractors + properties: + type: + type: string + enum: [ CombinedExtractor ] + extractors: + description: placeholder + type: array + items: + anyOf: + - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CombinedExtractor" + - "$ref": "#/definitions/CustomRecordExtractor" + $parameters: + type: object + additionalProperties: true DpathExtractor: title: Dpath Extractor description: Record extractor that searches a decoded response over a path defined as an array of fields. @@ -3315,6 +3337,7 @@ definitions: extractor: anyOf: - "$ref": "#/definitions/DpathExtractor" + - "$ref": "#/definitions/CombinedExtractor" - "$ref": "#/definitions/CustomRecordExtractor" record_filter: title: Record Filter diff --git a/airbyte_cdk/sources/declarative/extractors/__init__.py b/airbyte_cdk/sources/declarative/extractors/__init__.py index 8f1d18d12..95c757191 100644 --- a/airbyte_cdk/sources/declarative/extractors/__init__.py +++ b/airbyte_cdk/sources/declarative/extractors/__init__.py @@ -2,6 +2,7 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from airbyte_cdk.sources.declarative.extractors.combined_extractor import CombinedExtractor from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter @@ -18,4 +19,5 @@ "RecordFilter", "RecordSelector", "ResponseToFileExtractor", + "CombinedExtractor", ] diff --git a/airbyte_cdk/sources/declarative/extractors/combined_extractor.py b/airbyte_cdk/sources/declarative/extractors/combined_extractor.py new file mode 100644 index 000000000..899fef773 --- /dev/null +++ b/airbyte_cdk/sources/declarative/extractors/combined_extractor.py @@ -0,0 +1,45 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass +from typing import Any, Iterable, List, MutableMapping + +import requests + +from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor + + +@dataclass +class CombinedExtractor(RecordExtractor): + """ + Extractor that merges the output of multiple sub-extractors into a single record. + + This extractor takes a list of `RecordExtractor` instances (`extractors`), each of which + independently extracts records from the response. For each response, the extractor: + + 1. Invokes each sub-extractor to generate iterables of records. + 2. Zips the results together, so that the first record from each extractor is combined, + the second from each, and so on. + 3. Merges each group of records into a single dictionary using `dict.update()`. + + The result is a sequence of dictionaries where each dictionary contains the merged keys + and values from the corresponding records across all extractors. + + Example: + keys_extractor -> yields: [{"name": "Alice", "age": 30}] + extra_data_extractor -> yields: [{"country": "US"}] + CombinedExtractor(extractors=[keys_extractor, extra_data_extractor]) -> + yields: [{"name": "Alice", "age": 30, "country": "US"}] + """ + + extractors: List[RecordExtractor] + + def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]: + extractors_records = [extractor.extract_records(response) for extractor in self.extractors] + + for records in zip(*extractors_records): + merged = {} + for record in records: + merged.update(record) # merge all fields + yield merged diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0aa9fa569..962549b0f 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -1844,6 +1842,14 @@ class DefaultPaginator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class CombinedExtractor(BaseModel): + type: Literal["CombinedExtractor"] + extractors: List[Union[DpathExtractor, CombinedExtractor, CustomRecordExtractor]] = Field( + ..., description="placeholder" + ) + parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") + + class SessionTokenRequestApiKeyAuthenticator(BaseModel): type: Literal["ApiKey"] inject_into: RequestOption = Field( @@ -1881,7 +1887,7 @@ class ListPartitionRouter(BaseModel): class RecordSelector(BaseModel): type: Literal["RecordSelector"] - extractor: Union[DpathExtractor, CustomRecordExtractor] + extractor: Union[DpathExtractor, CombinedExtractor, CustomRecordExtractor] record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field( None, description="Responsible for filtering records to be emitted by the Source.", @@ -2164,7 +2170,7 @@ class Config: ] ] = Field( None, - description="Component used to retrieve the schema for the current stream.", + description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.", title="Schema Loader", ) transformations: Optional[ @@ -2749,6 +2755,7 @@ class DynamicDeclarativeStream(BaseModel): ComplexFieldType.update_forward_refs() +CombinedExtractor.update_forward_refs() GzipDecoder.update_forward_refs() CompositeErrorHandler.update_forward_refs() DeclarativeSource1.update_forward_refs() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index e7cdb6683..ce09d39b3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -83,6 +83,7 @@ Parser, ) from airbyte_cdk.sources.declarative.extractors import ( + CombinedExtractor, DpathExtractor, RecordFilter, RecordSelector, @@ -142,6 +143,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CheckStream as CheckStreamModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + CombinedExtractor as CombinedExtractorModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ComplexFieldType as ComplexFieldTypeModel, ) @@ -641,6 +645,7 @@ def _init_mappings(self) -> None: DefaultErrorHandlerModel: self.create_default_error_handler, DefaultPaginatorModel: self.create_default_paginator, DpathExtractorModel: self.create_dpath_extractor, + CombinedExtractorModel: self.create_combined_extractor, ResponseToFileExtractorModel: self.create_response_to_file_extractor, ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy, SessionTokenAuthenticatorModel: self.create_session_token_authenticator, @@ -2218,6 +2223,20 @@ def create_dpath_extractor( parameters=model.parameters or {}, ) + def create_combined_extractor( + self, + model: CombinedExtractorModel, + config: Config, + decoder: Optional[Decoder] = JsonDecoder(parameters={}), + **kwargs: Any, + ) -> CombinedExtractor: + extractors = [ + self._create_component_from_model(model=extractor, decoder=decoder, config=config) + for extractor in model.extractors + ] + + return CombinedExtractor(extractors=extractors) + @staticmethod def create_response_to_file_extractor( model: ResponseToFileExtractorModel, diff --git a/unit_tests/sources/declarative/extractors/test_combined_extractor.py b/unit_tests/sources/declarative/extractors/test_combined_extractor.py new file mode 100644 index 000000000..d245a6918 --- /dev/null +++ b/unit_tests/sources/declarative/extractors/test_combined_extractor.py @@ -0,0 +1,130 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +import json +from unittest.mock import MagicMock + +from airbyte_cdk.models import ( + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + DestinationSyncMode, + Type, +) +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse + + +def to_configured_stream( + stream, + sync_mode=None, + destination_sync_mode=DestinationSyncMode.append, + cursor_field=None, + primary_key=None, +) -> ConfiguredAirbyteStream: + return ConfiguredAirbyteStream( + stream=stream, + sync_mode=sync_mode, + destination_sync_mode=destination_sync_mode, + cursor_field=cursor_field, + primary_key=primary_key, + ) + + +def to_configured_catalog( + configured_streams, +) -> ConfiguredAirbyteCatalog: + return ConfiguredAirbyteCatalog(streams=configured_streams) + + +_CONFIG = { + "start_date": "2024-07-01T00:00:00.000Z", + "api_key": "dummy_api_key", +} + +_MANIFEST = { + "version": "6.7.0", + "type": "DeclarativeSource", + "check": {"type": "CheckStream", "stream_names": ["Rates"]}, + "streams": [ + { + "type": "DeclarativeStream", + "name": "test_stream", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "/items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": { + "type": "CombinedExtractor", + "extractors": [ + {"type": "DpathExtractor", "field_path": ["dimensions"]}, + {"type": "DpathExtractor", "field_path": ["metrics"]}, + ], + }, + }, + "paginator": {"type": "NoPagination"}, + }, + } + ], +} + + +def test_combined_extractor(): + source = ConcurrentDeclarativeSource( + source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None + ) + + actual_catalog = source.discover(logger=source.logger, config=_CONFIG) + + configured_streams = [ + to_configured_stream(stream, primary_key=stream.source_defined_primary_key) + for stream in actual_catalog.streams + ] + configured_catalog = to_configured_catalog(configured_streams) + + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url="https://api.test.com/items"), + HttpResponse( + body=json.dumps( + { + "dimensions": {"customer_segment": "enterprise"}, + "metrics": {"revenue": 295000}, + } + ) + ), + ) + + records = [ + message.record + for message in source.read(MagicMock(), _CONFIG, configured_catalog) + if message.type == Type.RECORD + ] + + assert len(records) == 1 + assert records[0].data == {"customer_segment": "enterprise", "revenue": 295000}