Skip to content

feat(cdk): add KeyValueExtractor #552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,54 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeyValueExtractor:
title: Key Value Extractor
description: Record extractor that extract with .
type: object
required:
- type
- keys_extractor
- values_extractor
properties:
type:
type: string
enum: [ KeyValueExtractor ]
keys_extractor:
description: placeholder
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
values_extractor:
description: placeholder
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
$parameters:
type: object
additionalProperties: true
CombinedExtractor:
title: Combined Extractor
description: Record extractor that extract with .
type: object
required:
- type
- extractors
properties:
type:
type: string
enum: [ CombinedExtractor ]
extractors:
description: placeholder
type: array
items:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CombinedExtractor"
- "$ref": "#/definitions/KeyValueExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
$parameters:
type: object
additionalProperties: true
DpathExtractor:
title: Dpath Extractor
description: Record extractor that searches a decoded response over a path defined as an array of fields.
Expand Down Expand Up @@ -2318,6 +2366,12 @@ definitions:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_filter:
title: Schema Filter
description: placeholder
anyOf:
- "$ref": "#/definitions/RecordFilter"
- "$ref": "#/definitions/CustomRecordFilter"
schema_transformations:
title: Schema Transformations
description: A list of transformations to be applied to the schema.
Expand Down Expand Up @@ -3315,6 +3369,8 @@ definitions:
extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CombinedExtractor"
- "$ref": "#/definitions/KeyValueExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
record_filter:
title: Record Filter
Expand Down Expand Up @@ -3994,6 +4050,9 @@ definitions:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
"$ref": "#/definitions/ValueType"
create_or_update:
type: boolean
default: false
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -4045,6 +4104,10 @@ definitions:
- ["data"]
- ["data", "streams"]
- ["data", "{{ parameters.name }}"]
default_values:
title: Default Values
description: placeholder
type: array
$parameters:
type: object
additionalProperties: true
Expand All @@ -4056,7 +4119,11 @@ definitions:
type: string
enum: [ConfigComponentsResolver]
stream_config:
"$ref": "#/definitions/StreamConfig"
anyOf:
- type: array
items:
"$ref": "#/definitions/StreamConfig"
- "$ref": "#/definitions/StreamConfig"
components_mapping:
type: array
items:
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.extractors.combined_extractor import CombinedExtractor
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.key_value_extractor import KeyValueExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
Expand All @@ -18,4 +20,6 @@
"RecordFilter",
"RecordSelector",
"ResponseToFileExtractor",
"KeyValueExtractor",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, can we alphabetize this

"CombinedExtractor",
]
44 changes: 44 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/combined_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from itertools import islice
from typing import Any, Iterable, List, Mapping, MutableMapping, Union

import dpath
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config


@dataclass
class CombinedExtractor(RecordExtractor):
"""
Extractor that combines keys and values from two separate extractors.

The `keys_extractor` and `values_extractor` extract records independently
from the response. Their outputs are zipped together to form key-value mappings.

Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
and each value from `values_extractor` is the value for that key.

Example:
keys_extractor -> yields: ["name", "age"]
values_extractor -> yields: ["Alice", 30]
result: { "name": "Alice", "age": 30 }
"""

extractors: List[RecordExtractor]

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
extractors_records = [extractor.extract_records(response) for extractor in self.extractors]

for records in zip(*extractors_records):
merged = {}
for record in records:
merged.update(record) # merge all fields
yield merged
42 changes: 42 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/key_value_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from itertools import islice
from typing import Any, Iterable, MutableMapping

import requests

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor


@dataclass
class KeyValueExtractor(RecordExtractor):
"""
Extractor that combines keys and values from two separate extractors.

The `keys_extractor` and `values_extractor` extract records independently
from the response. Their outputs are zipped together to form key-value mappings.

Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
and each value from `values_extractor` is the value for that key.

Example:
keys_extractor -> yields: ["name", "age"]
values_extractor -> yields: ["Alice", 30]
result: { "name": "Alice", "age": 30 }
"""

keys_extractor: RecordExtractor
values_extractor: RecordExtractor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency, we should probably also take in the config and parameter objects since right now we are also passing them in via the factory in model_to_component_factory.py.create_key_value_extractor()


def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
keys = list(self.keys_extractor.extract_records(response))
values = self.values_extractor.extract_records(response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is still a little unclear to me is what the expected behavior for this should be beyond the simplest case of there are 2 field keys and 2 values in the list.

What happens in the following:

  • Values has a length of 3 like ["Alice", 30, "what about this"]? Is this an error state, because right now, this would yield a second record of {"name": "what about this"} without the second property
  • Are we always expecting values to always be a single continuous list ["Alice", 30, "Thomas", 40, "Alex", 35]? Or do we expect it to be grouped into multiple lists of lists: [["Alice", 30], ["Thomas", 40], ["Alex", 35]]?


while True:
chunk = list(islice(values, len(keys)))
if not chunk:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little more idiomatic to avoid using a permanent True + break statement. Can we instead use a has_more_chunks variable to manage when to stop iterating.

has_more_chunks = True
while has_more_chunks:
  ...
  if len(chunk) == 0: # As far as I understand reading the code we break if there are no values in the current chunk
    has_more_chunks = True
  else:
    yield dict(zip(keys, chunk))

yield dict(zip(keys, chunk))
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
}
)

stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1480,6 +1478,7 @@ class ComponentMappingDefinition(BaseModel):
description="The expected data type of the value. If omitted, the type will be inferred from the value provided.",
title="Value Type",
)
create_or_update: Optional[bool] = False
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand All @@ -1491,12 +1490,13 @@ class StreamConfig(BaseModel):
examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]],
title="Configs Pointer",
)
default_values: Optional[List] = Field(None, description="placeholder", title="Default Values")
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ConfigComponentsResolver(BaseModel):
type: Literal["ConfigComponentsResolver"]
stream_config: StreamConfig
stream_config: Union[List[StreamConfig], StreamConfig]
components_mapping: List[ComponentMappingDefinition]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand Down Expand Up @@ -1844,6 +1844,25 @@ class DefaultPaginator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeyValueExtractor(BaseModel):
type: Literal["KeyValueExtractor"]
keys_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
..., description="placeholder"
)
values_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
..., description="placeholder"
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CombinedExtractor(BaseModel):
type: Literal["CombinedExtractor"]
extractors: List[
Union[DpathExtractor, CombinedExtractor, KeyValueExtractor, CustomRecordExtractor]
] = Field(..., description="placeholder")
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SessionTokenRequestApiKeyAuthenticator(BaseModel):
type: Literal["ApiKey"]
inject_into: RequestOption = Field(
Expand Down Expand Up @@ -1881,7 +1900,7 @@ class ListPartitionRouter(BaseModel):

class RecordSelector(BaseModel):
type: Literal["RecordSelector"]
extractor: Union[DpathExtractor, CustomRecordExtractor]
extractor: Union[DpathExtractor, CombinedExtractor, KeyValueExtractor, CustomRecordExtractor]
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None,
description="Responsible for filtering records to be emitted by the Source.",
Expand Down Expand Up @@ -2164,7 +2183,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down Expand Up @@ -2414,6 +2433,9 @@ class DynamicSchemaLoader(BaseModel):
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None, description="placeholder", title="Schema Filter"
)
schema_transformations: Optional[
List[
Union[
Expand Down Expand Up @@ -2749,6 +2771,7 @@ class DynamicDeclarativeStream(BaseModel):


ComplexFieldType.update_forward_refs()
CombinedExtractor.update_forward_refs()
GzipDecoder.update_forward_refs()
CompositeErrorHandler.update_forward_refs()
DeclarativeSource1.update_forward_refs()
Expand Down
Loading
Loading