Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(low-code cdk): add dynamic stream config to check connection #450

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions airbyte_cdk/sources/declarative/checks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@
from pydantic.v1 import BaseModel

from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
from airbyte_cdk.sources.declarative.checks.check_stream import (
CheckStream,
DynamicStreamCheckConfig,
)
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.declarative.models import (
CheckDynamicStream as CheckDynamicStreamModel,
Expand All @@ -21,4 +24,4 @@
"CheckDynamicStream": CheckDynamicStreamModel,
}

__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"]
__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"]
124 changes: 113 additions & 11 deletions airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,23 @@
import logging
import traceback
from dataclasses import InitVar, dataclass
from typing import Any, List, Mapping, Tuple
from typing import Any, Dict, List, Mapping, Optional, Tuple

from airbyte_cdk import AbstractSource
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy


@dataclass(frozen=True)
class DynamicStreamCheckConfig:
"""Defines the configuration for dynamic stream during connection checking. This class specifies
what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation
and type enforcement."""

dynamic_stream_name: str
stream_count: int = 0


@dataclass
class CheckStream(ConnectionChecker):
"""
Expand All @@ -23,34 +33,126 @@ class CheckStream(ConnectionChecker):

stream_names: List[str]
parameters: InitVar[Mapping[str, Any]]
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
if self.dynamic_streams_check_configs is None:
self.dynamic_streams_check_configs = []

def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]:
"""Logs an error and returns a formatted error message."""
error_message = f"Encountered an error while {action}. Error: {error}"
logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True)
return False, error_message

def check_connection(
self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any]
) -> Tuple[bool, Any]:
streams = source.streams(config=config)
"""Checks the connection to the source and its streams."""
try:
streams = source.streams(config=config)
if not streams:
return False, f"No streams to connect to from source {source}"
except Exception as error:
return self._log_error(logger, "discovering streams", error)

stream_name_to_stream = {s.name: s for s in streams}
if len(streams) == 0:
return False, f"No streams to connect to from source {source}"
for stream_name in self.stream_names:
if stream_name not in stream_name_to_stream.keys():
if stream_name not in stream_name_to_stream:
raise ValueError(
f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}."
f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
)

stream_availability, message = self._check_stream_availability(
stream_name_to_stream, stream_name, logger
)
if not stream_availability:
return stream_availability, message

should_check_dynamic_streams = (
hasattr(source, "resolved_manifest")
and hasattr(source, "dynamic_streams")
and self.dynamic_streams_check_configs
)

if should_check_dynamic_streams:
return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger)

return True, None

def _check_stream_availability(
self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger
) -> Tuple[bool, Any]:
"""Checks if streams are available."""
availability_strategy = HttpAvailabilityStrategy()
try:
stream = stream_name_to_stream[stream_name]
availability_strategy = HttpAvailabilityStrategy()
stream_is_available, reason = availability_strategy.check_availability(stream, logger)
if not stream_is_available:
message = f"Stream {stream_name} is not available: {reason}"
logger.warning(message)
return stream_is_available, message
except Exception as error:
return self._log_error(logger, f"checking availability of stream {stream_name}", error)
return True, None

def _check_dynamic_streams_availability(
self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger
) -> Tuple[bool, Any]:
"""Checks the availability of dynamic streams."""
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
dynamic_stream_name_to_dynamic_stream = {
ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams)
}
generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method

for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__
if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream:
return (
False,
f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.",
)

generated = generated_streams.get(check_config.dynamic_stream_name, [])
stream_availability, message = self._check_generated_streams_availability(
generated, stream_name_to_stream, logger, check_config.stream_count
)
if not stream_availability:
return stream_availability, message

return True, None

def _map_generated_streams(
self, dynamic_streams: List[Dict[str, Any]]
) -> Dict[str, List[Dict[str, Any]]]:
"""Maps dynamic stream names to their corresponding generated streams."""
mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
for stream in dynamic_streams:
mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream)
return mapped_streams

def _check_generated_streams_availability(
self,
generated_streams: List[Dict[str, Any]],
stream_name_to_stream: Dict[str, Any],
logger: logging.Logger,
max_count: int,
) -> Tuple[bool, Any]:
"""Checks availability of generated dynamic streams."""
availability_strategy = HttpAvailabilityStrategy()
for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]:
stream = stream_name_to_stream[declarative_stream["name"]]
try:
stream_is_available, reason = availability_strategy.check_availability(
stream, logger
)
if not stream_is_available:
return False, reason
message = f"Dynamic Stream {stream.name} is not available: {reason}"
logger.warning(message)
return False, message
except Exception as error:
logger.error(
f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}"
return self._log_error(
logger, f"checking availability of dynamic stream {stream.name}", error
)
return False, f"Unable to connect to stream {stream_name} - {error}"
return True, None
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ definitions:
type: object
required:
- type
- stream_names
properties:
type:
type: string
Expand All @@ -330,6 +329,28 @@ definitions:
examples:
- ["users"]
- ["users", "contacts"]
dynamic_streams_check_configs:
type: array
items:
"$ref": "#/definitions/DynamicStreamCheckConfig"
DynamicStreamCheckConfig:
type: object
required:
- type
- dynamic_stream_name
properties:
type:
type: string
enum: [ DynamicStreamCheckConfig ]
dynamic_stream_name:
title: Dynamic Stream Name
description: The dynamic stream name.
type: string
stream_count:
title: Stream Count
description: The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used.
type: integer
default: 0
CheckDynamicStream:
title: Dynamic Streams to Check
description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ class BearerAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class CheckStream(BaseModel):
type: Literal["CheckStream"]
stream_names: List[str] = Field(
...,
description="Names of the streams to try reading from when running a check operation.",
examples=[["users"], ["users", "contacts"]],
title="Stream Names",
class DynamicStreamCheckConfig(BaseModel):
type: Literal["DynamicStreamCheckConfig"]
dynamic_stream_name: str = Field(
..., description="The dynamic stream name.", title="Dynamic Stream Name"
)
stream_count: Optional[int] = Field(
0,
description="Numbers of the streams to try reading from when running a check operation.",
title="Stream Count",
)


Expand Down Expand Up @@ -1523,6 +1525,17 @@ class AuthFlow(BaseModel):
oauth_config_specification: Optional[OAuthConfigSpecification] = None


class CheckStream(BaseModel):
type: Literal["CheckStream"]
stream_names: Optional[List[str]] = Field(
None,
description="Names of the streams to try reading from when running a check operation.",
examples=[["users"], ["users", "contacts"]],
title="Stream Names",
)
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None


class IncrementingCountCursor(BaseModel):
type: Literal["IncrementingCountCursor"]
cursor_field: str = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@
SessionTokenProvider,
TokenProvider,
)
from airbyte_cdk.sources.declarative.checks import CheckDynamicStream, CheckStream
from airbyte_cdk.sources.declarative.checks import (
CheckDynamicStream,
CheckStream,
DynamicStreamCheckConfig,
)
from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
Expand Down Expand Up @@ -218,6 +222,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DynamicSchemaLoader as DynamicSchemaLoaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
DynamicStreamCheckConfig as DynamicStreamCheckConfigModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
Expand Down Expand Up @@ -559,6 +566,7 @@ def _init_mappings(self) -> None:
BasicHttpAuthenticatorModel: self.create_basic_http_authenticator,
BearerAuthenticatorModel: self.create_bearer_authenticator,
CheckStreamModel: self.create_check_stream,
DynamicStreamCheckConfigModel: self.create_dynamic_stream_check_config,
CheckDynamicStreamModel: self.create_check_dynamic_stream,
CompositeErrorHandlerModel: self.create_composite_error_handler,
ConcurrencyLevelModel: self.create_concurrency_level,
Expand Down Expand Up @@ -936,8 +944,36 @@ def create_bearer_authenticator(
)

@staticmethod
def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any) -> CheckStream:
return CheckStream(stream_names=model.stream_names, parameters={})
def create_dynamic_stream_check_config(
model: DynamicStreamCheckConfigModel, config: Config, **kwargs: Any
) -> DynamicStreamCheckConfig:
return DynamicStreamCheckConfig(
dynamic_stream_name=model.dynamic_stream_name,
stream_count=model.stream_count or 0,
)

def create_check_stream(
self, model: CheckStreamModel, config: Config, **kwargs: Any
) -> CheckStream:
if model.dynamic_streams_check_configs is None and model.stream_names is None:
raise ValueError(
"Expected either stream_names or dynamic_streams_check_configs to be set for CheckStream"
)

dynamic_streams_check_configs = (
[
self._create_component_from_model(model=dynamic_stream_check_config, config=config)
for dynamic_stream_check_config in model.dynamic_streams_check_configs
]
if model.dynamic_streams_check_configs
else []
)

return CheckStream(
stream_names=model.stream_names or [],
dynamic_streams_check_configs=dynamic_streams_check_configs,
parameters={},
)

@staticmethod
def create_check_dynamic_stream(
Expand Down
Loading
Loading