Skip to content

feat(cdk): update ConfigComponentsResolver to support list of stream_config #553

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3994,6 +3994,9 @@ definitions:
title: Value Type
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
"$ref": "#/definitions/ValueType"
create_or_update:
type: boolean
default: false
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -4045,6 +4048,12 @@ definitions:
- ["data"]
- ["data", "streams"]
- ["data", "{{ parameters.name }}"]
default_values:
title: Default Values
description: placeholder
type: array
items:
type: object
$parameters:
type: object
additionalProperties: true
Expand All @@ -4056,7 +4065,11 @@ definitions:
type: string
enum: [ConfigComponentsResolver]
stream_config:
"$ref": "#/definitions/StreamConfig"
anyOf:
- type: array
items:
"$ref": "#/definitions/StreamConfig"
- "$ref": "#/definitions/StreamConfig"
components_mapping:
type: array
items:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
}
)

stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
self._source_config, config
)
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams

api_budget_model = self._source_config.get("api_budget")
if api_budget_model:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -1480,6 +1478,7 @@ class ComponentMappingDefinition(BaseModel):
description="The expected data type of the value. If omitted, the type will be inferred from the value provided.",
title="Value Type",
)
create_or_update: Optional[bool] = False
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand All @@ -1491,12 +1490,15 @@ class StreamConfig(BaseModel):
examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]],
title="Configs Pointer",
)
default_values: Optional[List[Dict[str, Any]]] = Field(
None, description="placeholder", title="Default Values"
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ConfigComponentsResolver(BaseModel):
type: Literal["ConfigComponentsResolver"]
stream_config: StreamConfig
stream_config: Union[List[StreamConfig], StreamConfig]
components_mapping: List[ComponentMappingDefinition]
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")

Expand Down Expand Up @@ -2164,7 +2166,7 @@ class Config:
]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
title="Schema Loader",
)
transformations: Optional[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3597,6 +3597,7 @@ def create_components_mapping_definition(
field_path=field_path, # type: ignore[arg-type] # field_path can be str and InterpolatedString
value=interpolated_value,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
create_or_update=model.create_or_update,
parameters=model.parameters or {},
)

Expand Down Expand Up @@ -3643,16 +3644,24 @@ def create_stream_config(

return StreamConfig(
configs_pointer=model_configs_pointer,
default_values=model.default_values,
parameters=model.parameters or {},
)

def create_config_components_resolver(
self, model: ConfigComponentsResolverModel, config: Config
) -> Any:
stream_config = self._create_component_from_model(
model.stream_config, config=config, parameters=model.parameters or {}
model_stream_configs = (
model.stream_config if isinstance(model.stream_config, list) else [model.stream_config]
)

stream_configs = [
self._create_component_from_model(
stream_config, config=config, parameters=model.parameters or {}
)
for stream_config in model_stream_configs
]

components_mapping = [
self._create_component_from_model(
model=components_mapping_definition_model,
Expand All @@ -3665,7 +3674,7 @@ def create_config_components_resolver(
]

return ConfigComponentsResolver(
stream_config=stream_config,
stream_configs=stream_configs,
config=config,
components_mapping=components_mapping,
parameters=model.parameters or {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class ComponentMappingDefinition:
value: Union["InterpolatedString", str]
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]
create_or_update: Optional[bool] = False


@dataclass(frozen=True)
Expand All @@ -34,6 +35,7 @@ class ResolvedComponentMappingDefinition:
value: "InterpolatedString"
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]
create_or_update: Optional[bool] = False


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from copy import deepcopy
from dataclasses import InitVar, dataclass, field
from typing import Any, Dict, Iterable, List, Mapping, Union
from itertools import product
from typing import Any, Dict, Iterable, List, Mapping, Optional, Union

import dpath
from typing_extensions import deprecated
Expand All @@ -28,6 +29,7 @@ class StreamConfig:

configs_pointer: List[Union[InterpolatedString, str]]
parameters: InitVar[Mapping[str, Any]]
default_values: Optional[List[Any]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self.configs_pointer = [
Expand All @@ -48,7 +50,7 @@ class ConfigComponentsResolver(ComponentsResolver):
parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
"""

stream_config: StreamConfig
stream_configs: List[StreamConfig]
config: Config
components_mapping: List[ComponentMappingDefinition]
parameters: InitVar[Mapping[str, Any]]
Expand Down Expand Up @@ -82,6 +84,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
field_path=field_path,
value=interpolated_value,
value_type=component_mapping.value_type,
create_or_update=component_mapping.create_or_update,
parameters=parameters,
)
)
Expand All @@ -91,17 +94,35 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)

@property
def _stream_config(self) -> Iterable[Mapping[str, Any]]:
path = [
node.eval(self.config) if not isinstance(node, str) else node
for node in self.stream_config.configs_pointer
]
stream_config = dpath.get(dict(self.config), path, default=[])

if not isinstance(stream_config, list):
stream_config = [stream_config]

return stream_config
def _stream_config(self):
def resolve_path(pointer):
return [
node.eval(self.config) if not isinstance(node, str) else node for node in pointer
]

def normalize_configs(configs):
return configs if isinstance(configs, list) else [configs]

def prepare_streams():
for stream_config in self.stream_configs:
path = resolve_path(stream_config.configs_pointer)
stream_configs = dpath.get(dict(self.config), path, default=[])
stream_configs = normalize_configs(stream_configs)
if stream_config.default_values:
stream_configs.extend(stream_config.default_values)
yield [(i, item) for i, item in enumerate(stream_configs)]

def merge_combination(combo):
result = {}
for config_index, (elem_index, elem) in enumerate(combo):
if isinstance(elem, dict):
result.update(elem)
else:
result.setdefault(f"source_config_{config_index}", (elem_index, elem))
return result

all_indexed_streams = list(prepare_streams())
return [merge_combination(combo) for combo in product(*all_indexed_streams)]

def resolve_components(
self, stream_template_config: Dict[str, Any]
Expand Down Expand Up @@ -130,7 +151,21 @@ def resolve_components(
)

path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
parsed_value = self._parse_yaml_if_possible(value)
updated = dpath.set(updated_config, path, parsed_value)

dpath.set(updated_config, path, value)
if parsed_value and not updated and resolved_component.create_or_update:
dpath.new(updated_config, path, parsed_value)

yield updated_config

@staticmethod
def _parse_yaml_if_possible(value: Any) -> Any:
if isinstance(value, str):
try:
import yaml

return yaml.safe_load(value)
except Exception:
return value
return value
Loading