diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index ca595d92e..29b1eaba7 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -3994,6 +3994,9 @@ definitions: title: Value Type description: The expected data type of the value. If omitted, the type will be inferred from the value provided. "$ref": "#/definitions/ValueType" + create_or_update: + type: boolean + default: false $parameters: type: object additionalProperties: true @@ -4045,6 +4048,12 @@ definitions: - ["data"] - ["data", "streams"] - ["data", "{{ parameters.name }}"] + default_values: + title: Default Values + description: placeholder + type: array + items: + type: object $parameters: type: object additionalProperties: true @@ -4056,7 +4065,11 @@ definitions: type: string enum: [ConfigComponentsResolver] stream_config: - "$ref": "#/definitions/StreamConfig" + anyOf: + - type: array + items: + "$ref": "#/definitions/StreamConfig" + - "$ref": "#/definitions/StreamConfig" components_mapping: type: array items: diff --git a/airbyte_cdk/sources/declarative/manifest_declarative_source.py b/airbyte_cdk/sources/declarative/manifest_declarative_source.py index c98372be7..2d5ee40c6 100644 --- a/airbyte_cdk/sources/declarative/manifest_declarative_source.py +++ b/airbyte_cdk/sources/declarative/manifest_declarative_source.py @@ -262,9 +262,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: } ) - stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs( - self._source_config, config - ) + stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams api_budget_model = self._source_config.get("api_budget") if api_budget_model: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 0aa9fa569..96eb1ba3d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1,5 +1,3 @@ -# Copyright (c) 2025 Airbyte, Inc., all rights reserved. - # generated by datamodel-codegen: # filename: declarative_component_schema.yaml @@ -1480,6 +1478,7 @@ class ComponentMappingDefinition(BaseModel): description="The expected data type of the value. If omitted, the type will be inferred from the value provided.", title="Value Type", ) + create_or_update: Optional[bool] = False parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -1491,12 +1490,15 @@ class StreamConfig(BaseModel): examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]], title="Configs Pointer", ) + default_values: Optional[List[Dict[str, Any]]] = Field( + None, description="placeholder", title="Default Values" + ) parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") class ConfigComponentsResolver(BaseModel): type: Literal["ConfigComponentsResolver"] - stream_config: StreamConfig + stream_config: Union[List[StreamConfig], StreamConfig] components_mapping: List[ComponentMappingDefinition] parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") @@ -2164,7 +2166,7 @@ class Config: ] ] = Field( None, - description="Component used to retrieve the schema for the current stream.", + description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.", title="Schema Loader", ) transformations: Optional[ diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index e7cdb6683..e7f82e372 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -3597,6 +3597,7 @@ def create_components_mapping_definition( field_path=field_path, # type: ignore[arg-type] # field_path can be str and InterpolatedString value=interpolated_value, value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type), + create_or_update=model.create_or_update, parameters=model.parameters or {}, ) @@ -3643,16 +3644,24 @@ def create_stream_config( return StreamConfig( configs_pointer=model_configs_pointer, + default_values=model.default_values, parameters=model.parameters or {}, ) def create_config_components_resolver( self, model: ConfigComponentsResolverModel, config: Config ) -> Any: - stream_config = self._create_component_from_model( - model.stream_config, config=config, parameters=model.parameters or {} + model_stream_configs = ( + model.stream_config if isinstance(model.stream_config, list) else [model.stream_config] ) + stream_configs = [ + self._create_component_from_model( + stream_config, config=config, parameters=model.parameters or {} + ) + for stream_config in model_stream_configs + ] + components_mapping = [ self._create_component_from_model( model=components_mapping_definition_model, @@ -3665,7 +3674,7 @@ def create_config_components_resolver( ] return ConfigComponentsResolver( - stream_config=stream_config, + stream_configs=stream_configs, config=config, components_mapping=components_mapping, parameters=model.parameters or {}, diff --git a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py index 5975b3082..977d3e1c7 100644 --- a/airbyte_cdk/sources/declarative/resolvers/components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/components_resolver.py @@ -22,6 +22,7 @@ class ComponentMappingDefinition: value: Union["InterpolatedString", str] value_type: Optional[Type[Any]] parameters: InitVar[Mapping[str, Any]] + create_or_update: Optional[bool] = False @dataclass(frozen=True) @@ -34,6 +35,7 @@ class ResolvedComponentMappingDefinition: value: "InterpolatedString" value_type: Optional[Type[Any]] parameters: InitVar[Mapping[str, Any]] + create_or_update: Optional[bool] = False @deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) diff --git a/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py b/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py index 0308ea5da..3b337e56f 100644 --- a/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py +++ b/airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py @@ -4,7 +4,8 @@ from copy import deepcopy from dataclasses import InitVar, dataclass, field -from typing import Any, Dict, Iterable, List, Mapping, Union +from itertools import product +from typing import Any, Dict, Iterable, List, Mapping, Optional, Union import dpath from typing_extensions import deprecated @@ -28,6 +29,7 @@ class StreamConfig: configs_pointer: List[Union[InterpolatedString, str]] parameters: InitVar[Mapping[str, Any]] + default_values: Optional[List[Any]] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self.configs_pointer = [ @@ -48,7 +50,7 @@ class ConfigComponentsResolver(ComponentsResolver): parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. """ - stream_config: StreamConfig + stream_configs: List[StreamConfig] config: Config components_mapping: List[ComponentMappingDefinition] parameters: InitVar[Mapping[str, Any]] @@ -82,6 +84,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: field_path=field_path, value=interpolated_value, value_type=component_mapping.value_type, + create_or_update=component_mapping.create_or_update, parameters=parameters, ) ) @@ -91,17 +94,35 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: ) @property - def _stream_config(self) -> Iterable[Mapping[str, Any]]: - path = [ - node.eval(self.config) if not isinstance(node, str) else node - for node in self.stream_config.configs_pointer - ] - stream_config = dpath.get(dict(self.config), path, default=[]) - - if not isinstance(stream_config, list): - stream_config = [stream_config] - - return stream_config + def _stream_config(self): + def resolve_path(pointer): + return [ + node.eval(self.config) if not isinstance(node, str) else node for node in pointer + ] + + def normalize_configs(configs): + return configs if isinstance(configs, list) else [configs] + + def prepare_streams(): + for stream_config in self.stream_configs: + path = resolve_path(stream_config.configs_pointer) + stream_configs = dpath.get(dict(self.config), path, default=[]) + stream_configs = normalize_configs(stream_configs) + if stream_config.default_values: + stream_configs.extend(stream_config.default_values) + yield [(i, item) for i, item in enumerate(stream_configs)] + + def merge_combination(combo): + result = {} + for config_index, (elem_index, elem) in enumerate(combo): + if isinstance(elem, dict): + result.update(elem) + else: + result.setdefault(f"source_config_{config_index}", (elem_index, elem)) + return result + + all_indexed_streams = list(prepare_streams()) + return [merge_combination(combo) for combo in product(*all_indexed_streams)] def resolve_components( self, stream_template_config: Dict[str, Any] @@ -130,7 +151,21 @@ def resolve_components( ) path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] + parsed_value = self._parse_yaml_if_possible(value) + updated = dpath.set(updated_config, path, parsed_value) - dpath.set(updated_config, path, value) + if parsed_value and not updated and resolved_component.create_or_update: + dpath.new(updated_config, path, parsed_value) yield updated_config + + @staticmethod + def _parse_yaml_if_possible(value: Any) -> Any: + if isinstance(value, str): + try: + import yaml + + return yaml.safe_load(value) + except Exception: + return value + return value