Skip to content

🚨🚨[DON'T MERGE] all CDK changes for google analytics migration🚨🚨 #554

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1741,6 +1741,54 @@ definitions:
$parameters:
type: object
additionalProperties: true
KeyValueExtractor:
title: Key Value Extractor
description: Record extractor that extract with .
type: object
required:
- type
- keys_extractor
- values_extractor
properties:
type:
type: string
enum: [ KeyValueExtractor ]
keys_extractor:
description: placeholder
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
values_extractor:
description: placeholder
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
$parameters:
type: object
additionalProperties: true
CombinedExtractor:
title: Combined Extractor
description: Record extractor that extract with .
type: object
required:
- type
- extractors
properties:
type:
type: string
enum: [ CombinedExtractor ]
extractors:
description: placeholder
type: array
items:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CombinedExtractor"
- "$ref": "#/definitions/KeyValueExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
$parameters:
type: object
additionalProperties: true
DpathExtractor:
title: Dpath Extractor
description: Record extractor that searches a decoded response over a path defined as an array of fields.
Expand Down Expand Up @@ -2318,6 +2366,12 @@ definitions:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_filter:
title: Schema Filter
description: placeholder
anyOf:
- "$ref": "#/definitions/RecordFilter"
- "$ref": "#/definitions/CustomRecordFilter"
schema_transformations:
title: Schema Transformations
description: A list of transformations to be applied to the schema.
Expand Down Expand Up @@ -3315,6 +3369,8 @@ definitions:
extractor:
anyOf:
- "$ref": "#/definitions/DpathExtractor"
- "$ref": "#/definitions/CombinedExtractor"
- "$ref": "#/definitions/KeyValueExtractor"
- "$ref": "#/definitions/CustomRecordExtractor"
record_filter:
title: Record Filter
Expand Down Expand Up @@ -3994,6 +4050,9 @@ definitions:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
"$ref": "#/definitions/ValueType"
create_or_update:
type: boolean
default: false
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -4045,6 +4104,10 @@ definitions:
- ["data"]
- ["data", "streams"]
- ["data", "{{ parameters.name }}"]
default_values:
title: Default Values
description: placeholder
type: array
$parameters:
type: object
additionalProperties: true
Expand All @@ -4056,7 +4119,11 @@ definitions:
type: string
enum: [ConfigComponentsResolver]
stream_config:
"$ref": "#/definitions/StreamConfig"
anyOf:
- type: array
items:
"$ref": "#/definitions/StreamConfig"
- "$ref": "#/definitions/StreamConfig"
components_mapping:
type: array
items:
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.extractors.combined_extractor import CombinedExtractor
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
from airbyte_cdk.sources.declarative.extractors.key_value_extractor import KeyValueExtractor
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
Expand All @@ -18,4 +20,6 @@
"RecordFilter",
"RecordSelector",
"ResponseToFileExtractor",
"KeyValueExtractor",
"CombinedExtractor",
]
44 changes: 44 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/combined_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from itertools import islice
from typing import Any, Iterable, List, Mapping, MutableMapping, Union

import dpath
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config


@dataclass
class CombinedExtractor(RecordExtractor):
"""
Extractor that combines keys and values from two separate extractors.

The `keys_extractor` and `values_extractor` extract records independently
from the response. Their outputs are zipped together to form key-value mappings.

Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
and each value from `values_extractor` is the value for that key.

Example:
keys_extractor -> yields: ["name", "age"]
values_extractor -> yields: ["Alice", 30]
result: { "name": "Alice", "age": 30 }
"""

extractors: List[RecordExtractor]

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
extractors_records = [extractor.extract_records(response) for extractor in self.extractors]

for records in zip(*extractors_records):
merged = {}
for record in records:
merged.update(record) # merge all fields
yield merged
46 changes: 46 additions & 0 deletions airbyte_cdk/sources/declarative/extractors/key_value_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from dataclasses import InitVar, dataclass, field
from itertools import islice
from typing import Any, Iterable, List, Mapping, MutableMapping, Union

import dpath
import requests

from airbyte_cdk.sources.declarative.decoders import Decoder, JsonDecoder
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.types import Config


@dataclass
class KeyValueExtractor(RecordExtractor):
"""
Extractor that combines keys and values from two separate extractors.

The `keys_extractor` and `values_extractor` extract records independently
from the response. Their outputs are zipped together to form key-value mappings.

Each key from `keys_extractor` should correspond to a key in the resulting dictionary,
and each value from `values_extractor` is the value for that key.

Example:
keys_extractor -> yields: ["name", "age"]
values_extractor -> yields: ["Alice", 30]
result: { "name": "Alice", "age": 30 }
"""

keys_extractor: RecordExtractor
values_extractor: RecordExtractor

def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
keys = list(self.keys_extractor.extract_records(response))
values = self.values_extractor.extract_records(response)

while True:
chunk = list(islice(values, len(keys)))
if not chunk:
break
yield dict(zip(keys, chunk))
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
}
)

stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1480,6 +1478,7 @@ class ComponentMappingDefinition(BaseModel):
description="The expected data type of the value. If omitted, the type will be inferred from the value provided.",
title="Value Type",
)
create_or_update: Optional[bool] = False
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand All @@ -1491,12 +1490,13 @@ class StreamConfig(BaseModel):
examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]],
title="Configs Pointer",
)
default_values: Optional[List] = Field(None, description="placeholder", title="Default Values")
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ConfigComponentsResolver(BaseModel):
type: Literal["ConfigComponentsResolver"]
stream_config: StreamConfig
stream_config: Union[List[StreamConfig], StreamConfig]
components_mapping: List[ComponentMappingDefinition]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand Down Expand Up @@ -1844,6 +1844,25 @@ class DefaultPaginator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class KeyValueExtractor(BaseModel):
type: Literal["KeyValueExtractor"]
keys_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
..., description="placeholder"
)
values_extractor: Union[DpathExtractor, CustomRecordExtractor] = Field(
..., description="placeholder"
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CombinedExtractor(BaseModel):
type: Literal["CombinedExtractor"]
extractors: List[
Union[DpathExtractor, CombinedExtractor, KeyValueExtractor, CustomRecordExtractor]
] = Field(..., description="placeholder")
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class SessionTokenRequestApiKeyAuthenticator(BaseModel):
type: Literal["ApiKey"]
inject_into: RequestOption = Field(
Expand Down Expand Up @@ -1881,7 +1900,7 @@ class ListPartitionRouter(BaseModel):

class RecordSelector(BaseModel):
type: Literal["RecordSelector"]
extractor: Union[DpathExtractor, CustomRecordExtractor]
extractor: Union[DpathExtractor, CombinedExtractor, KeyValueExtractor, CustomRecordExtractor]
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None,
description="Responsible for filtering records to be emitted by the Source.",
Expand Down Expand Up @@ -2164,7 +2183,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down Expand Up @@ -2414,6 +2433,9 @@ class DynamicSchemaLoader(BaseModel):
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None, description="placeholder", title="Schema Filter"
)
schema_transformations: Optional[
List[
Union[
Expand Down Expand Up @@ -2749,6 +2771,7 @@ class DynamicDeclarativeStream(BaseModel):


ComplexFieldType.update_forward_refs()
CombinedExtractor.update_forward_refs()
GzipDecoder.update_forward_refs()
CompositeErrorHandler.update_forward_refs()
DeclarativeSource1.update_forward_refs()
Expand Down
Loading
Loading