Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,28 @@ definitions:
$parameters:
type: object
additionalProperties: true
CombinedExtractor:
title: Combined Extractor
description: Record extractor that merges the output of multiple sub-extractors into a single record.
type: object
required:
- type
- extractors
properties:
type:
type: string
enum: [ CombinedExtractor ]
extractors:
description: placeholder
type: array
items:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CombinedExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
$parameters:
type: object
additionalProperties: true
DpathExtractor:
title: Dpath Extractor
description: Record extractor that searches a decoded response over a path defined as an array of fields.
Expand Down Expand Up @@ -3315,6 +3337,7 @@ definitions:
extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CombinedExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
record_filter:
title: Record Filter
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.extractors.combined_extractor import CombinedExtractor
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
Expand All @@ -18,4 +19,5 @@
"RecordFilter",
"RecordSelector",
"ResponseToFileExtractor",
"CombinedExtractor",
]
45 changes: 45 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/combined_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from typing import Any, Iterable, List, MutableMapping

import requests

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor


@dataclass
class CombinedExtractor(RecordExtractor):
"""
Extractor that merges the output of multiple sub-extractors into a single record.

This extractor takes a list of `RecordExtractor` instances (`extractors`), each of which
independently extracts records from the response. For each response, the extractor:

1. Invokes each sub-extractor to generate iterables of records.
2. Zips the results together, so that the first record from each extractor is combined,
the second from each, and so on.
3. Merges each group of records into a single dictionary using `dict.update()`.

The result is a sequence of dictionaries where each dictionary contains the merged keys
and values from the corresponding records across all extractors.

Example:
keys_extractor -> yields: [{"name": "Alice", "age": 30}]
extra_data_extractor -> yields: [{"country": "US"}]
CombinedExtractor(extractors=[keys_extractor, extra_data_extractor]) ->
yields: [{"name": "Alice", "age": 30, "country": "US"}]
"""

extractors: List[RecordExtractor]

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
extractors_records = [extractor.extract_records(response) for extractor in self.extractors]

for records in zip(*extractors_records):
merged = {}
for record in records:
merged.update(record) # merge all fields
yield merged
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1844,6 +1842,14 @@ class DefaultPaginator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CombinedExtractor(BaseModel):
type: Literal["CombinedExtractor"]
extractors: List[Union[DpathExtractor, CombinedExtractor, CustomRecordExtractor]] = Field(
..., description="placeholder"
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SessionTokenRequestApiKeyAuthenticator(BaseModel):
type: Literal["ApiKey"]
inject_into: RequestOption = Field(
Expand Down Expand Up @@ -1881,7 +1887,7 @@ class ListPartitionRouter(BaseModel):

class RecordSelector(BaseModel):
type: Literal["RecordSelector"]
extractor: Union[DpathExtractor, CustomRecordExtractor]
extractor: Union[DpathExtractor, CombinedExtractor, CustomRecordExtractor]
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None,
description="Responsible for filtering records to be emitted by the Source.",
Expand Down Expand Up @@ -2164,7 +2170,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down Expand Up @@ -2749,6 +2755,7 @@ class DynamicDeclarativeStream(BaseModel):


ComplexFieldType.update_forward_refs()
CombinedExtractor.update_forward_refs()
GzipDecoder.update_forward_refs()
CompositeErrorHandler.update_forward_refs()
DeclarativeSource1.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
Parser,
)
from airbyte_cdk.sources.declarative.extractors import (
CombinedExtractor,
DpathExtractor,
RecordFilter,
RecordSelector,
Expand Down Expand Up @@ -142,6 +143,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CheckStream as CheckStreamModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
CombinedExtractor as CombinedExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ComplexFieldType as ComplexFieldTypeModel,
)
Expand Down Expand Up @@ -641,6 +645,7 @@ def _init_mappings(self) -> None:
DefaultErrorHandlerModel: self.create_default_error_handler,
DefaultPaginatorModel: self.create_default_paginator,
DpathExtractorModel: self.create_dpath_extractor,
CombinedExtractorModel: self.create_combined_extractor,
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
Expand Down Expand Up @@ -2218,6 +2223,20 @@ def create_dpath_extractor(
parameters=model.parameters or {},
)

def create_combined_extractor(
self,
model: CombinedExtractorModel,
config: Config,
decoder: Optional[Decoder] = JsonDecoder(parameters={}),
**kwargs: Any,
) -> CombinedExtractor:
extractors = [
self._create_component_from_model(model=extractor, decoder=decoder, config=config)
for extractor in model.extractors
]

return CombinedExtractor(extractors=extractors)

@staticmethod
def create_response_to_file_extractor(
model: ResponseToFileExtractorModel,
Expand Down
130 changes: 130 additions & 0 deletions unit_tests/sources/declarative/extractors/test_combined_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import json
from unittest.mock import MagicMock

from airbyte_cdk.models import (
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
DestinationSyncMode,
Type,
)
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse


def to_configured_stream(
stream,
sync_mode=None,
destination_sync_mode=DestinationSyncMode.append,
cursor_field=None,
primary_key=None,
) -> ConfiguredAirbyteStream:
return ConfiguredAirbyteStream(
stream=stream,
sync_mode=sync_mode,
destination_sync_mode=destination_sync_mode,
cursor_field=cursor_field,
primary_key=primary_key,
)


def to_configured_catalog(
configured_streams,
) -> ConfiguredAirbyteCatalog:
return ConfiguredAirbyteCatalog(streams=configured_streams)


_CONFIG = {
"start_date": "2024-07-01T00:00:00.000Z",
"api_key": "dummy_api_key",
}

_MANIFEST = {
"version": "6.7.0",
"type": "DeclarativeSource",
"check": {"type": "CheckStream", "stream_names": ["Rates"]},
"streams": [
{
"type": "DeclarativeStream",
"name": "test_stream",
"primary_key": [],
"schema_loader": {
"type": "InlineSchemaLoader",
"schema": {
"$schema": "http://json-schema.org/schema#",
"properties": {
"ABC": {"type": "number"},
"AED": {"type": "number"},
},
"type": "object",
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://api.test.com",
"path": "/items",
"http_method": "GET",
"authenticator": {
"type": "ApiKeyAuthenticator",
"header": "apikey",
"api_token": "{{ config['api_key'] }}",
},
},
"record_selector": {
"type": "RecordSelector",
"extractor": {
"type": "CombinedExtractor",
"extractors": [
{"type": "DpathExtractor", "field_path": ["dimensions"]},
{"type": "DpathExtractor", "field_path": ["metrics"]},
],
},
},
"paginator": {"type": "NoPagination"},
},
}
],
}


def test_combined_extractor():
source = ConcurrentDeclarativeSource(
source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
)

actual_catalog = source.discover(logger=source.logger, config=_CONFIG)

configured_streams = [
to_configured_stream(stream, primary_key=stream.source_defined_primary_key)
for stream in actual_catalog.streams
]
configured_catalog = to_configured_catalog(configured_streams)

with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url="https://api.test.com/items"),
HttpResponse(
body=json.dumps(
{
"dimensions": {"customer_segment": "enterprise"},
"metrics": {"revenue": 295000},
}
)
),
)

records = [
message.record
for message in source.read(MagicMock(), _CONFIG, configured_catalog)
if message.type == Type.RECORD
]

assert len(records) == 1
assert records[0].data == {"customer_segment": "enterprise", "revenue": 295000}
Loading