Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion airbyte_cdk/test/entrypoint_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
from airbyte_cdk.test.models.scenario import ExpectedOutcome


@dataclass
class AirbyteEntrypointException(Exception):
"""Exception raised for errors in the AirbyteEntrypoint execution.
Expand Down
9 changes: 9 additions & 0 deletions airbyte_cdk/test/models/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,21 @@ class AcceptanceTestFileTypes(BaseModel):
skip_test: bool
bypass_reason: str

class AcceptanceTestEmptyStream(BaseModel):
name: str
bypass_reason: str | None = None

# bypass reason does not affect equality
def __hash__(self) -> int:
return hash(self.name)

config_path: Path | None = None
config_dict: dict[str, Any] | None = None

_id: str | None = None # Used to override the default ID generation

configured_catalog_path: Path | None = None
empty_streams: list[AcceptanceTestEmptyStream] | None = None
timeout_seconds: int | None = None
expect_records: AcceptanceTestExpectRecords | None = None
file_types: AcceptanceTestFileTypes | None = None
Expand Down
8 changes: 0 additions & 8 deletions airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,12 @@

from boltons.typeutils import classproperty

from airbyte_cdk.models import (
AirbyteMessage,
Type,
)
from airbyte_cdk.test import entrypoint_wrapper
from airbyte_cdk.test.models import (
ConnectorTestScenario,
)
from airbyte_cdk.test.standard_tests._job_runner import IConnector, run_test_job
from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite
from airbyte_cdk.utils.connector_paths import (
ACCEPTANCE_TEST_CONFIG,
find_connector_root,
)

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down
86 changes: 61 additions & 25 deletions airbyte_cdk/test/standard_tests/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import warnings
from dataclasses import asdict
from pathlib import Path
from subprocess import CompletedProcess, SubprocessError
from typing import Literal, cast

import orjson
Expand All @@ -35,7 +34,6 @@
from airbyte_cdk.utils.docker import (
build_connector_image,
run_docker_airbyte_command,
run_docker_command,
)


Expand Down Expand Up @@ -66,13 +64,57 @@ def is_destination_connector(cls) -> bool:
return cast(str, cls.connector_name).startswith("destination-")

@classproperty
def acceptance_test_config_path(cls) -> Path:
"""Get the path to the acceptance test config file."""
result = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
if result.exists():
return result
def acceptance_test_config(cls) -> dict[str, object]:
"""Get the contents of acceptance test config file.

raise FileNotFoundError(f"Acceptance test config file not found at: {str(result)}")
Also perform some basic validation that the file has the expected structure.
"""
acceptance_test_config_path = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
if not acceptance_test_config_path.exists():
raise FileNotFoundError(
f"Acceptance test config file not found at: {str(acceptance_test_config_path)}"
)

tests_config = yaml.safe_load(acceptance_test_config_path.read_text())

if "acceptance_tests" not in tests_config:
raise ValueError(
f"Acceptance tests config not found in {acceptance_test_config_path}."
f" Found only: {str(tests_config)}."
)
return tests_config

@staticmethod
def _dedup_scenarios(scenarios: list[ConnectorTestScenario]) -> list[ConnectorTestScenario]:
"""
For FAST tests, we treat each config as a separate test scenario to run against, whereas CATs defined
a series of more granular scenarios specifying a config_path and empty_streams among other things.

This method deduplicates the CATs scenarios based on their config_path. In doing so, we choose to
take the union of any defined empty_streams, to have high confidence that runnning a read with the
config will not error on the lack of data in the empty streams or lack of permissions to read them.

"""
deduped_scenarios: list[ConnectorTestScenario] = []

for scenario in scenarios:
for existing_scenario in deduped_scenarios:
if scenario.config_path == existing_scenario.config_path:
# If a scenario with the same config_path already exists, we merge the empty streams.
# scenarios are immutable, so we create a new one.
all_empty_streams = (existing_scenario.empty_streams or []) + (
scenario.empty_streams or []
)
merged_scenario = existing_scenario.model_copy(
update={"empty_streams": list(set(all_empty_streams))}
)
deduped_scenarios.remove(existing_scenario)
deduped_scenarios.append(merged_scenario)
break
else:
# If a scenario does not exist with the config, add the new scenario to the list.
deduped_scenarios.append(scenario)
return deduped_scenarios

@classmethod
def get_scenarios(
Expand All @@ -83,9 +125,8 @@ def get_scenarios(
This has to be a separate function because pytest does not allow
parametrization of fixtures with arguments from the test class itself.
"""
categories = ["connection", "spec"]
try:
acceptance_test_config_path = cls.acceptance_test_config_path
all_tests_config = cls.acceptance_test_config
except FileNotFoundError as e:
# Destinations sometimes do not have an acceptance tests file.
warnings.warn(
Expand All @@ -95,15 +136,9 @@ def get_scenarios(
)
return []

all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text())
if "acceptance_tests" not in all_tests_config:
raise ValueError(
f"Acceptance tests config not found in {cls.acceptance_test_config_path}."
f" Found only: {str(all_tests_config)}."
)

test_scenarios: list[ConnectorTestScenario] = []
for category in categories:
# we look in the basic_read section to find any empty streams
for category in ["spec", "connection", "basic_read"]:
if (
category not in all_tests_config["acceptance_tests"]
or "tests" not in all_tests_config["acceptance_tests"][category]
Expand All @@ -121,15 +156,11 @@ def get_scenarios(

scenario = ConnectorTestScenario.model_validate(test)

if scenario.config_path and scenario.config_path in [
s.config_path for s in test_scenarios
]:
# Skip duplicate scenarios based on config_path
continue

test_scenarios.append(scenario)

return test_scenarios
deduped_test_scenarios = cls._dedup_scenarios(test_scenarios)

return deduped_test_scenarios

@pytest.mark.skipif(
shutil.which("docker") is None,
Expand Down Expand Up @@ -332,6 +363,11 @@ def test_docker_image_build_and_read(
# If `read_from_streams` is a list, we filter the discovered streams.
streams_list = list(set(streams_list) & set(read_from_streams))

if scenario.empty_streams:
# Filter out streams marked as empty in the scenario.
empty_stream_names = [stream.name for stream in scenario.empty_streams]
streams_list = [s for s in streams_list if s.name not in empty_stream_names]

configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
Expand Down
8 changes: 7 additions & 1 deletion airbyte_cdk/test/standard_tests/source_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ def test_basic_read(
if scenario.expected_outcome.expect_exception() and discover_result.errors:
# Failed as expected; we're done.
return
streams = discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr]

if scenario.empty_streams:
# Filter out streams marked as empty in the scenario.
empty_stream_names = [stream.name for stream in scenario.empty_streams]
streams = [s for s in streams if s.name not in empty_stream_names]

configured_catalog = ConfiguredAirbyteCatalog(
streams=[
Expand All @@ -128,7 +134,7 @@ def test_basic_read(
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append_dedup,
)
for stream in discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr]
for stream in streams
]
)
result = run_test_job(
Expand Down
3 changes: 1 addition & 2 deletions airbyte_cdk/utils/connector_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ def _find_in_adjacent_dirs(current_dir: Path) -> Path | None:

def resolve_connector_name_and_directory(
connector_ref: str | Path | None = None,
*,
connector_directory: Path | None = None,
) -> tuple[str, Path]:
"""Resolve the connector name and directory.
Expand All @@ -104,6 +102,7 @@ def resolve_connector_name_and_directory(
FileNotFoundError: If the connector directory does not exist or cannot be found.
"""
connector_name: str | None = None
connector_directory: Path | None = None

# Resolve connector_ref to connector_name or connector_directory (if provided)
if connector_ref:
Expand Down
Loading