Skip to content

feat(cdk): add KeyValueExtractor #552

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,29 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeyValueExtractor:
title: Key Value Extractor
description: Record extractor that combines keys and values from two separate extractors.
type: object
required:
- type
- keys_extractor
- values_extractor
properties:
type:
type: string
enum: [ KeyValueExtractor ]
keys_extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
values_extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
$parameters:
type: object
additionalProperties: true
DpathExtractor:
title: Dpath Extractor
description: Record extractor that searches a decoded response over a path defined as an array of fields.
Expand Down Expand Up @@ -3315,6 +3338,7 @@ definitions:
extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/KeyValueExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
record_filter:
title: Record Filter
Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.key_value_extractor import KeyValueExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
Expand All @@ -18,4 +19,5 @@
"RecordFilter",
"RecordSelector",
"ResponseToFileExtractor",
"KeyValueExtractor",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, can we alphabetize this

]
42 changes: 42 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/key_value_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass
from itertools import islice
from typing import Any, Iterable, MutableMapping

import requests

from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor


@dataclass
class KeyValueExtractor(RecordExtractor):
"""
Extractor that combines keys and values from two separate extractors.

The `keys_extractor` and `values_extractor` extract records independently
from the response. Their outputs are zipped together to form key-value mappings.

Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
and each value from `values_extractor` is the value for that key.

Example:
keys_extractor -> yields: ["name", "age"]
values_extractor -> yields: ["Alice", 30]
result: { "name": "Alice", "age": 30 }
"""

keys_extractor: RecordExtractor
values_extractor: RecordExtractor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for consistency, we should probably also take in the config and parameter objects since right now we are also passing them in via the factory in model_to_component_factory.py.create_key_value_extractor()


def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
keys = list(self.keys_extractor.extract_records(response))
values = self.values_extractor.extract_records(response)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is still a little unclear to me is what the expected behavior for this should be beyond the simplest case of there are 2 field keys and 2 values in the list.

What happens in the following:

  • Values has a length of 3 like ["Alice", 30, "what about this"]? Is this an error state, because right now, this would yield a second record of {"name": "what about this"} without the second property
  • Are we always expecting values to always be a single continuous list ["Alice", 30, "Thomas", 40, "Alex", 35]? Or do we expect it to be grouped into multiple lists of lists: [["Alice", 30], ["Thomas", 40], ["Alex", 35]]?


while True:
chunk = list(islice(values, len(keys)))
if not chunk:
break
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels a little more idiomatic to avoid using a permanent True + break statement. Can we instead use a has_more_chunks variable to manage when to stop iterating.

has_more_chunks = True
while has_more_chunks:
  ...
  if len(chunk) == 0: # As far as I understand reading the code we break if there are no values in the current chunk
    has_more_chunks = True
  else:
    yield dict(zip(keys, chunk))

yield dict(zip(keys, chunk))
111 changes: 68 additions & 43 deletions airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -620,7 +618,9 @@ class OAuthAuthenticator(BaseModel):
scopes: Optional[List[str]] = Field(
None,
description="List of scopes that should be granted to the access token.",
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
examples=[
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
],
title="Scopes",
)
token_expiry_date: Optional[str] = Field(
Expand Down Expand Up @@ -1126,24 +1126,28 @@ class OAuthConfigSpecification(BaseModel):
class Config:
extra = Extra.allow

oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
)
)
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
)
)
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
None,
Expand All @@ -1161,7 +1165,9 @@ class Config:
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
examples=[
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
],
title="OAuth input specification",
)
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -1844,6 +1850,13 @@ class DefaultPaginator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeyValueExtractor(BaseModel):
type: Literal["KeyValueExtractor"]
keys_extractor: Union[DpathExtractor, CustomRecordExtractor]
values_extractor: Union[DpathExtractor, CustomRecordExtractor]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SessionTokenRequestApiKeyAuthenticator(BaseModel):
type: Literal["ApiKey"]
inject_into: RequestOption = Field(
Expand Down Expand Up @@ -1881,13 +1894,15 @@ class ListPartitionRouter(BaseModel):

class RecordSelector(BaseModel):
type: Literal["RecordSelector"]
extractor: Union[DpathExtractor, CustomRecordExtractor]
extractor: Union[DpathExtractor, KeyValueExtractor, CustomRecordExtractor]
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None,
description="Responsible for filtering records to be emitted by the Source.",
title="Record Filter",
)
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
schema_normalization: Optional[
Union[SchemaNormalization, CustomSchemaNormalization]
] = Field(
None,
description="Responsible for normalization according to the schema.",
title="Schema Normalization",
Expand Down Expand Up @@ -2133,7 +2148,9 @@ class Config:
extra = Extra.allow

type: Literal["DeclarativeStream"]
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
name: Optional[str] = Field(
"", description="The stream name.", example=["Users"], title="Name"
)
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages.",
Expand Down Expand Up @@ -2164,7 +2181,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down Expand Up @@ -2311,18 +2328,20 @@ class HttpRequester(BaseModelWithDeprecations):
description="Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.",
title="Fetch Properties from Endpoint",
)
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field(
None,
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
examples=[
{"unit": "day"},
{
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
},
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
{"sort_by[asc]": "updated_at"},
],
title="Query Parameters",
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = (
Field(
None,
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
examples=[
{"unit": "day"},
{
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
},
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
{"sort_by[asc]": "updated_at"},
],
title="Query Parameters",
)
)
request_headers: Optional[Union[Dict[str, str], str]] = Field(
None,
Expand Down Expand Up @@ -2512,7 +2531,9 @@ class QueryProperties(BaseModel):

class StateDelegatingStream(BaseModel):
type: Literal["StateDelegatingStream"]
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
name: str = Field(
..., description="The stream name.", example=["Users"], title="Name"
)
full_refresh_stream: DeclarativeStream = Field(
...,
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
Expand Down Expand Up @@ -2601,7 +2622,9 @@ class AsyncRetriever(BaseModel):
)
download_extractor: Optional[
Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor]
] = Field(None, description="Responsible for fetching the records from provided urls.")
] = Field(
None, description="Responsible for fetching the records from provided urls."
)
creation_requester: Union[HttpRequester, CustomRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
Expand Down Expand Up @@ -2741,10 +2764,12 @@ class DynamicDeclarativeStream(BaseModel):
stream_template: DeclarativeStream = Field(
..., description="Reference to the stream template.", title="Stream Template"
)
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
Field(
...,
description="Component resolve and populates stream templates with components values.",
title="Components Resolver",
)
)


Expand Down
Loading
Loading