Skip to content

feat(cdk): add KeyValueExtractor #552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,29 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeyValueExtractor:
title: Key Value Extractor
description: Record extractor that combines keys and values from two separate extractors.
type: object
required:
- type
- keys_extractor
- values_extractor
properties:
type:
type: string
enum: [ KeyValueExtractor ]
keys_extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
values_extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
$parameters:
type: object
additionalProperties: true
DpathExtractor:
title: Dpath Extractor
description: Record extractor that searches a decoded response over a path defined as an array of fields.
Expand Down Expand Up @@ -3315,6 +3338,7 @@ definitions:
extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/KeyValueExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
record_filter:
title: Record Filter
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.key_value_extractor import KeyValueExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
Expand All @@ -18,4 +19,5 @@
"RecordFilter",
"RecordSelector",
"ResponseToFileExtractor",
"KeyValueExtractor",
]
42 changes: 42 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/key_value_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from itertools import islice
from typing import Any, Iterable, MutableMapping

import requests

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor


@dataclass
class KeyValueExtractor(RecordExtractor):
"""
Extractor that combines keys and values from two separate extractors.

The `keys_extractor` and `values_extractor` extract records independently
from the response. Their outputs are zipped together to form key-value mappings.

Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
and each value from `values_extractor` is the value for that key.

Example:
keys_extractor -> yields: ["name", "age"]
values_extractor -> yields: ["Alice", 30]
result: { "name": "Alice", "age": 30 }
"""

keys_extractor: RecordExtractor
values_extractor: RecordExtractor

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
keys = list(self.keys_extractor.extract_records(response))
values = self.values_extractor.extract_records(response)

while True:
chunk = list(islice(values, len(keys)))
if not chunk:
break
yield dict(zip(keys, chunk))
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1844,6 +1842,13 @@ class DefaultPaginator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeyValueExtractor(BaseModel):
type: Literal["KeyValueExtractor"]
keys_extractor: Union[DpathExtractor, CustomRecordExtractor]
values_extractor: Union[DpathExtractor, CustomRecordExtractor]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SessionTokenRequestApiKeyAuthenticator(BaseModel):
type: Literal["ApiKey"]
inject_into: RequestOption = Field(
Expand Down Expand Up @@ -1881,7 +1886,7 @@ class ListPartitionRouter(BaseModel):

class RecordSelector(BaseModel):
type: Literal["RecordSelector"]
extractor: Union[DpathExtractor, CustomRecordExtractor]
extractor: Union[DpathExtractor, KeyValueExtractor, CustomRecordExtractor]
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None,
description="Responsible for filtering records to be emitted by the Source.",
Expand Down Expand Up @@ -2164,7 +2169,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
)
from airbyte_cdk.sources.declarative.extractors import (
DpathExtractor,
KeyValueExtractor,
RecordFilter,
RecordSelector,
ResponseToFileExtractor,
Expand Down Expand Up @@ -304,6 +305,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeysToSnakeCase as KeysToSnakeCaseModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
KeyValueExtractor as KeyValueExtractorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
LegacySessionTokenAuthenticator as LegacySessionTokenAuthenticatorModel,
)
Expand Down Expand Up @@ -641,6 +645,7 @@ def _init_mappings(self) -> None:
DefaultErrorHandlerModel: self.create_default_error_handler,
DefaultPaginatorModel: self.create_default_paginator,
DpathExtractorModel: self.create_dpath_extractor,
KeyValueExtractorModel: self.create_key_value_extractor,
ResponseToFileExtractorModel: self.create_response_to_file_extractor,
ExponentialBackoffStrategyModel: self.create_exponential_backoff_strategy,
SessionTokenAuthenticatorModel: self.create_session_token_authenticator,
Expand Down Expand Up @@ -2218,6 +2223,22 @@ def create_dpath_extractor(
parameters=model.parameters or {},
)

def create_key_value_extractor(
self,
model: KeyValueExtractorModel,
config: Config,
decoder: Optional[Decoder] = JsonDecoder(parameters={}),
**kwargs: Any,
) -> KeyValueExtractor:
keys_extractor = self._create_component_from_model(
model=model.keys_extractor, decoder=decoder, config=config
)
values_extractor = self._create_component_from_model(
model=model.values_extractor, decoder=decoder, config=config
)

return KeyValueExtractor(keys_extractor=keys_extractor, values_extractor=values_extractor)

@staticmethod
def create_response_to_file_extractor(
model: ResponseToFileExtractorModel,
Expand Down
Loading