Skip to content

feat(cdk): update ConfigComponentsResolver to support list of stream_config #553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -2318,6 +2318,12 @@ definitions:
- "$ref": "#/definitions/AsyncRetriever"
- "$ref": "#/definitions/CustomRetriever"
- "$ref": "#/definitions/SimpleRetriever"
schema_filter:
title: Schema Filter
description: placeholder
anyOf:
- "$ref": "#/definitions/RecordFilter"
- "$ref": "#/definitions/CustomRecordFilter"
schema_transformations:
title: Schema Transformations
description: A list of transformations to be applied to the schema.
Expand Down Expand Up @@ -3994,6 +4000,9 @@ definitions:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
"$ref": "#/definitions/ValueType"
create_or_update:
type: boolean
default: false
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -4045,6 +4054,10 @@ definitions:
- ["data"]
- ["data", "streams"]
- ["data", "{{ parameters.name }}"]
default_values:
title: Default Values
description: placeholder
type: array
$parameters:
type: object
additionalProperties: true
Expand All @@ -4056,7 +4069,11 @@ definitions:
type: string
enum: [ConfigComponentsResolver]
stream_config:
"$ref": "#/definitions/StreamConfig"
anyOf:
- type: array
items:
"$ref": "#/definitions/StreamConfig"
- "$ref": "#/definitions/StreamConfig"
components_mapping:
type: array
items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
}
)

stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1480,6 +1478,7 @@ class ComponentMappingDefinition(BaseModel):
description="The expected data type of the value. If omitted, the type will be inferred from the value provided.",
title="Value Type",
)
create_or_update: Optional[bool] = False
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand All @@ -1491,12 +1490,13 @@ class StreamConfig(BaseModel):
examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]],
title="Configs Pointer",
)
default_values: Optional[List] = Field(None, description="placeholder", title="Default Values")
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ConfigComponentsResolver(BaseModel):
type: Literal["ConfigComponentsResolver"]
stream_config: StreamConfig
stream_config: Union[List[StreamConfig], StreamConfig]
components_mapping: List[ComponentMappingDefinition]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand Down Expand Up @@ -2164,7 +2164,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down Expand Up @@ -2414,6 +2414,9 @@ class DynamicSchemaLoader(BaseModel):
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
schema_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
None, description="placeholder", title="Schema Filter"
)
schema_transformations: Optional[
List[
Union[
Expand Down Expand Up @@ -2749,6 +2752,7 @@ class DynamicDeclarativeStream(BaseModel):


ComplexFieldType.update_forward_refs()
CombinedExtractor.update_forward_refs()
GzipDecoder.update_forward_refs()
CompositeErrorHandler.update_forward_refs()
DeclarativeSource1.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2432,10 +2432,14 @@ def create_dynamic_schema_loader(
schema_type_identifier = self._create_component_from_model(
model.schema_type_identifier, config=config, parameters=model.parameters or {}
)
schema_filter = self._create_component_from_model(
model.schema_filter, config=config, parameters=model.parameters or {}
)
return DynamicSchemaLoader(
retriever=retriever,
config=config,
schema_transformations=schema_transformations,
schema_filter=schema_filter,
schema_type_identifier=schema_type_identifier,
parameters=model.parameters or {},
)
Expand Down Expand Up @@ -3597,6 +3601,7 @@ def create_components_mapping_definition(
field_path=field_path, # type: ignore[arg-type] # field_path can be str and InterpolatedString
value=interpolated_value,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
create_or_update=model.create_or_update,
parameters=model.parameters or {},
)

Expand Down Expand Up @@ -3643,16 +3648,24 @@ def create_stream_config(

return StreamConfig(
configs_pointer=model_configs_pointer,
default_values=model.default_values,
parameters=model.parameters or {},
)

def create_config_components_resolver(
self, model: ConfigComponentsResolverModel, config: Config
) -> Any:
stream_config = self._create_component_from_model(
model.stream_config, config=config, parameters=model.parameters or {}
model_stream_configs = (
model.stream_config if isinstance(model.stream_config, list) else [model.stream_config]
)

stream_configs = [
self._create_component_from_model(
stream_config, config=config, parameters=model.parameters or {}
)
for stream_config in model_stream_configs
]

components_mapping = [
self._create_component_from_model(
model=components_mapping_definition_model,
Expand All @@ -3665,7 +3678,7 @@ def create_config_components_resolver(
]

return ConfigComponentsResolver(
stream_config=stream_config,
stream_configs=stream_configs,
config=config,
components_mapping=components_mapping,
parameters=model.parameters or {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ComponentMappingDefinition:
value: Union["InterpolatedString", str]
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]
create_or_update: Optional[bool] = False


@dataclass(frozen=True)
Expand All @@ -34,6 +35,7 @@ class ResolvedComponentMappingDefinition:
value: "InterpolatedString"
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]
create_or_update: Optional[bool] = False


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from copy import deepcopy
from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, Iterable, List, Mapping, Union
from itertools import product
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union

import dpath
from typing_extensions import deprecated
Expand All @@ -28,6 +29,7 @@ class StreamConfig:

configs_pointer: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
default_values: Optional[List[Any]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.configs_pointer = [
Expand All @@ -48,7 +50,7 @@ class ConfigComponentsResolver(ComponentsResolver):
parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
"""

stream_config: StreamConfig
stream_configs: List[StreamConfig]
config: Config
components_mapping: List[ComponentMappingDefinition]
parameters: InitVar[Mapping[str, Any]]
Expand Down Expand Up @@ -82,6 +84,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
field_path=field_path,
value=interpolated_value,
value_type=component_mapping.value_type,
create_or_update=component_mapping.create_or_update,
parameters=parameters,
)
)
Expand All @@ -91,17 +94,35 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)

@property
def _stream_config(self) -> Iterable[Mapping[str, Any]]:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self.stream_config.configs_pointer
]
stream_config = dpath.get(dict(self.config), path, default=[])

if not isinstance(stream_config, list):
stream_config = [stream_config]

return stream_config
def _stream_config(self):
def resolve_path(pointer):
return [
node.eval(self.config) if not isinstance(node, str) else node for node in pointer
]

def normalize_configs(configs):
return configs if isinstance(configs, list) else [configs]

def prepare_streams():
for stream_config in self.stream_configs:
path = resolve_path(stream_config.configs_pointer)
stream_configs = dpath.get(dict(self.config), path, default=[])
stream_configs = normalize_configs(stream_configs)
if stream_config.default_values:
stream_configs += stream_config.default_values
yield [(i, item) for i, item in enumerate(stream_configs)]

def merge_combination(combo):
result = {}
for config_index, (elem_index, elem) in enumerate(combo):
if isinstance(elem, dict):
result.update(elem)
else:
result.setdefault(f"source_config_{config_index}", (elem_index, elem))
return result

all_indexed_streams = list(prepare_streams())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The result of prepare_streams() is an iterable, but we always iterate over it to turn it back into a list. We should probably not perform this list() conversion unless it must be turned into a list to process on the next line.

And if we were to do that, then we may want to just make prepare_streams() return a list for simplicity

return [merge_combination(combo) for combo in product(*all_indexed_streams)]

def resolve_components(
self, stream_template_config: Dict[str, Any]
Expand Down Expand Up @@ -130,7 +151,21 @@ def resolve_components(
)

path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
parsed_value = self._parse_yaml_if_possible(value)
updated = dpath.set(updated_config, path, parsed_value)

dpath.set(updated_config, path, value)
if parsed_value and not updated and resolved_component.create_or_update:
dpath.new(updated_config, path, parsed_value)

yield updated_config

@staticmethod
def _parse_yaml_if_possible(value: Any) -> Any:
if isinstance(value, str):
try:
import yaml
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import this at the top level instead of inline


return yaml.safe_load(value)
except Exception:
return value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is it not the case that this isn't parsable in yaml? It seems like in most scenario this should always be parsable and if it wasn't we throw an error.

Would it be because the spec/config might be JSON instead of yaml?

Or at best we may want to have stricter error handling against a specific error type instead of all.

return value
20 changes: 16 additions & 4 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dpath
from typing_extensions import deprecated

from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
Expand Down Expand Up @@ -126,6 +127,7 @@ class DynamicSchemaLoader(SchemaLoader):
parameters: InitVar[Mapping[str, Any]]
schema_type_identifier: SchemaTypeIdentifier
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
schema_filter: Optional[RecordFilter] = None

def get_json_schema(self) -> Mapping[str, Any]:
"""
Expand All @@ -151,20 +153,18 @@ def get_json_schema(self) -> Mapping[str, Any]:
)
properties[key] = value

transformed_properties = self._transform(properties, {})
filtred_transformed_properties = self._transform(self._filter(properties))

return {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
"properties": transformed_properties,
"properties": filtred_transformed_properties,
}

def _transform(
self,
properties: Mapping[str, Any],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
) -> Mapping[str, Any]:
for transformation in self.schema_transformations:
transformation.transform(
Expand All @@ -173,6 +173,18 @@ def _transform(
)
return properties

def _filter(
self,
properties: Mapping[str, Any],
) -> Mapping[str, Any]:
if self.schema_filter:
filtered_properties = {}
for property in self.schema_filter.filter_records(properties.items(), {}):
filtered_properties[property[0]] = property[1]
return filtered_properties
else:
return properties

def _get_key(
self,
raw_schema: MutableMapping[str, Any],
Expand Down
Loading