Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add dynamic stream config to check connection #450

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions airbyte_cdk/sources/declarative/checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from pydantic.v1 import BaseModel

from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
from airbyte_cdk.sources.declarative.checks.check_stream import (
CheckStream,
DynamicStreamCheckConfig,
)
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.models import (
CheckDynamicStream as CheckDynamicStreamModel,
Expand All @@ -21,4 +24,4 @@
"CheckDynamicStream": CheckDynamicStreamModel,
}

__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"]
__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"]
74 changes: 73 additions & 1 deletion airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


@dataclass(frozen=True)
class DynamicStreamCheckConfig:
"""Defines the configuration for dynamic stream during connection checking. This class specifies
what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation
and type enforcement."""

dynamic_stream_name: str
stream_count: int = 0


@dataclass
class CheckStream(ConnectionChecker):
"""
Expand All @@ -22,6 +32,7 @@ class CheckStream(ConnectionChecker):
"""

stream_names: List[str]
dynamic_streams_check_configs: List[DynamicStreamCheckConfig]
parameters: InitVar[Mapping[str, Any]]

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
Expand All @@ -30,7 +41,13 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
def check_connection(
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Any]:
streams = source.streams(config=config)
try:
streams = source.streams(config=config)
except Exception as error:
error_message = f"Encountered an error trying to connect to streams. Error: {error}"
logger.error(error_message, exc_info=True)
return False, error_message

stream_name_to_stream = {s.name: s for s in streams}
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
Expand All @@ -53,4 +70,59 @@ def check_connection(
f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}"
)
return False, f"Unable to connect to stream {stream_name} - {error}"

if (
hasattr(source, "resolved_manifest")
and hasattr(source, "dynamic_streams")
and self.dynamic_streams_check_configs
):
dynamic_stream_name_to_dynamic_stream = {
dynamic_stream.get("name", f"dynamic_stream_{i}"): dynamic_stream
for i, dynamic_stream in enumerate(
source.resolved_manifest.get("dynamic_streams", [])
)
}

dynamic_stream_name_to_generated_streams = {}
for stream in source.dynamic_streams:
dynamic_stream_name_to_generated_streams[stream["dynamic_stream_name"]] = (
dynamic_stream_name_to_generated_streams.setdefault(
stream["dynamic_stream_name"], []
)
+ [stream]
)

for dynamic_streams_check_config in self.dynamic_streams_check_configs:
dynamic_stream = dynamic_stream_name_to_dynamic_stream.get(
dynamic_streams_check_config.dynamic_stream_name
)

is_config_depend = (
dynamic_stream["components_resolver"]["type"] == "ConfigComponentsResolver"
)

if not is_config_depend and not bool(dynamic_streams_check_config.stream_count):
continue

generated_streams = dynamic_stream_name_to_generated_streams.get(
dynamic_streams_check_config.dynamic_stream_name
)
availability_strategy = HttpAvailabilityStrategy()

for declarative_stream in generated_streams[
: min(dynamic_streams_check_config.stream_count, len(generated_streams))
]:
stream = stream_name_to_stream.get(declarative_stream["name"])
try:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
if not stream_is_available:
logger.warning(f"Stream {stream.name} is not available: {reason}")
return False, reason
except Exception as error:
error_message = f"Encountered an error trying to connect to stream {stream.name}. Error: {error}"
logger.error(error_message, exc_info=True)
return False, error_message

return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ definitions:
type: object
required:
- type
- stream_names
properties:
type:
type: string
Expand All @@ -330,6 +329,28 @@ definitions:
examples:
- ["users"]
- ["users", "contacts"]
dynamic_streams_check_configs:
type: array
items:
"$ref": "#/definitions/DynamicStreamCheckConfig"
DynamicStreamCheckConfig:
type: object
required:
- type
- dynamic_stream_name
properties:
type:
type: string
enum: [ DynamicStreamCheckConfig ]
dynamic_stream_name:
title: Dynamic Stream Name
description: The dynamic stream name.
type: string
stream_count:
title: Stream Count
description: Numbers of the streams to try reading from when running a check operation.
type: integer
default: 0
CheckDynamicStream:
title: Dynamic Streams to Check
description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ class BearerAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CheckStream(BaseModel):
type: Literal["CheckStream"]
stream_names: List[str] = Field(
...,
description="Names of the streams to try reading from when running a check operation.",
examples=[["users"], ["users", "contacts"]],
title="Stream Names",
class DynamicStreamCheckConfig(BaseModel):
type: Literal["DynamicStreamCheckConfig"]
dynamic_stream_name: str = Field(
..., description="The dynamic stream name.", title="Dynamic Stream Name"
)
stream_count: Optional[int] = Field(
0,
description="Numbers of the streams to try reading from when running a check operation.",
title="Stream Count",
)


Expand Down Expand Up @@ -1523,6 +1525,17 @@ class AuthFlow(BaseModel):
oauth_config_specification: Optional[OAuthConfigSpecification] = None


class CheckStream(BaseModel):
type: Literal["CheckStream"]
stream_names: Optional[List[str]] = Field(
None,
description="Names of the streams to try reading from when running a check operation.",
examples=[["users"], ["users", "contacts"]],
title="Stream Names",
)
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None


class IncrementingCountCursor(BaseModel):
type: Literal["IncrementingCountCursor"]
cursor_field: str = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@
SessionTokenProvider,
TokenProvider,
)
from airbyte_cdk.sources.declarative.checks import CheckDynamicStream, CheckStream
from airbyte_cdk.sources.declarative.checks import (
CheckDynamicStream,
CheckStream,
DynamicStreamCheckConfig,
)
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
Expand Down Expand Up @@ -218,6 +222,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DynamicSchemaLoader as DynamicSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DynamicStreamCheckConfig as DynamicStreamCheckConfigModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
Expand Down Expand Up @@ -559,6 +566,7 @@ def _init_mappings(self) -> None:
BasicHttpAuthenticatorModel: self.create_basic_http_authenticator,
BearerAuthenticatorModel: self.create_bearer_authenticator,
CheckStreamModel: self.create_check_stream,
DynamicStreamCheckConfigModel: self.create_dynamic_stream_check_config,
CheckDynamicStreamModel: self.create_check_dynamic_stream,
CompositeErrorHandlerModel: self.create_composite_error_handler,
ConcurrencyLevelModel: self.create_concurrency_level,
Expand Down Expand Up @@ -936,8 +944,36 @@ def create_bearer_authenticator(
)

@staticmethod
def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any) -> CheckStream:
return CheckStream(stream_names=model.stream_names, parameters={})
def create_dynamic_stream_check_config(
model: DynamicStreamCheckConfigModel, config: Config, **kwargs: Any
) -> DynamicStreamCheckConfig:
return DynamicStreamCheckConfig(
dynamic_stream_name=model.dynamic_stream_name,
stream_count=model.stream_count or 0,
)

def create_check_stream(
self, model: CheckStreamModel, config: Config, **kwargs: Any
) -> CheckStream:
if model.dynamic_streams_check_configs is None and model.stream_names is None:
raise ValueError(
"Expected either stream_names or dynamic_streams_check_configs to be set for CheckStream"
)

dynamic_streams_check_configs = (
[
self._create_component_from_model(model=dynamic_stream_check_config, config=config)
for dynamic_stream_check_config in model.dynamic_streams_check_configs
]
if model.dynamic_streams_check_configs
else []
)

return CheckStream(
stream_names=model.stream_names or [],
dynamic_streams_check_configs=dynamic_streams_check_configs,
parameters={},
)

@staticmethod
def create_check_dynamic_stream(
Expand Down
Loading
Loading