Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions airbyte_cdk/test/models/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,17 @@ class AcceptanceTestFileTypes(BaseModel):
skip_test: bool
bypass_reason: str

class AcceptanceTestEmptyStream(BaseModel):
name: str
bypass_reason: str | None = None

config_path: Path | None = None
config_dict: dict[str, Any] | None = None

_id: str | None = None # Used to override the default ID generation

configured_catalog_path: Path | None = None
empty_streams: list[AcceptanceTestEmptyStream] | None = None
timeout_seconds: int | None = None
expect_records: AcceptanceTestExpectRecords | None = None
file_types: AcceptanceTestFileTypes | None = None
Expand Down
8 changes: 0 additions & 8 deletions airbyte_cdk/test/standard_tests/connector_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,12 @@

from boltons.typeutils import classproperty

from airbyte_cdk.models import (
AirbyteMessage,
Type,
)
from airbyte_cdk.test import entrypoint_wrapper
from airbyte_cdk.test.models import (
ConnectorTestScenario,
)
from airbyte_cdk.test.standard_tests._job_runner import IConnector, run_test_job
from airbyte_cdk.test.standard_tests.docker_base import DockerConnectorTestSuite
from airbyte_cdk.utils.connector_paths import (
ACCEPTANCE_TEST_CONFIG,
find_connector_root,
)

if TYPE_CHECKING:
from collections.abc import Callable
Expand Down
72 changes: 47 additions & 25 deletions airbyte_cdk/test/standard_tests/docker_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import warnings
from dataclasses import asdict
from pathlib import Path
from subprocess import CompletedProcess, SubprocessError
from typing import Literal, cast

import orjson
Expand All @@ -35,7 +34,6 @@
from airbyte_cdk.utils.docker import (
build_connector_image,
run_docker_airbyte_command,
run_docker_command,
)


Expand Down Expand Up @@ -66,13 +64,25 @@ def is_destination_connector(cls) -> bool:
return cast(str, cls.connector_name).startswith("destination-")

@classproperty
def acceptance_test_config_path(cls) -> Path:
"""Get the path to the acceptance test config file."""
result = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
if result.exists():
return result
def acceptance_test_config(cls) -> dict[str, object]:
"""Get the contents of acceptance test config file.

raise FileNotFoundError(f"Acceptance test config file not found at: {str(result)}")
Also perform some basic validation that the file has the expected structure.
"""
acceptance_test_config_path = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
if not acceptance_test_config_path.exists():
raise FileNotFoundError(
f"Acceptance test config file not found at: {str(acceptance_test_config_path)}"
)

tests_config = yaml.safe_load(acceptance_test_config_path.read_text())

if "acceptance_tests" not in tests_config:
raise ValueError(
f"Acceptance tests config not found in {acceptance_test_config_path}."
f" Found only: {str(tests_config)}."
)
return tests_config

@classmethod
def get_scenarios(
Expand All @@ -83,9 +93,8 @@ def get_scenarios(
This has to be a separate function because pytest does not allow
parametrization of fixtures with arguments from the test class itself.
"""
categories = ["connection", "spec"]
try:
acceptance_test_config_path = cls.acceptance_test_config_path
all_tests_config = cls.acceptance_test_config
except FileNotFoundError as e:
# Destinations sometimes do not have an acceptance tests file.
warnings.warn(
Expand All @@ -95,15 +104,8 @@ def get_scenarios(
)
return []

all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text())
if "acceptance_tests" not in all_tests_config:
raise ValueError(
f"Acceptance tests config not found in {cls.acceptance_test_config_path}."
f" Found only: {str(all_tests_config)}."
)

test_scenarios: list[ConnectorTestScenario] = []
for category in categories:
for category in ["spec", "connection", "basic_read"]:
if (
category not in all_tests_config["acceptance_tests"]
or "tests" not in all_tests_config["acceptance_tests"][category]
Expand All @@ -121,15 +123,29 @@ def get_scenarios(

scenario = ConnectorTestScenario.model_validate(test)

if scenario.config_path and scenario.config_path in [
s.config_path for s in test_scenarios
]:
# Skip duplicate scenarios based on config_path
continue

test_scenarios.append(scenario)

return test_scenarios
# Remove duplicate scenarios based on config_path.
deduped_test_scenarios: list[ConnectorTestScenario] = []
for scenario in test_scenarios:
for existing_scenario in deduped_test_scenarios:
if scenario.config_path == existing_scenario.config_path:
# If a scenario with the same config_path already exists, we merge the empty streams.
# scenarios are immutable, so we create a new one.
all_empty_streams = (existing_scenario.empty_streams or []) + (
scenario.empty_streams or []
)
new_scenario = existing_scenario.model_copy(
update={"empty_streams": all_empty_streams}
)
deduped_test_scenarios.remove(existing_scenario)
deduped_test_scenarios.append(new_scenario)
break
else:
# If a scenario does not exist with the config, add the new scenario to the list.
deduped_test_scenarios.append(scenario)

return deduped_test_scenarios

@pytest.mark.skipif(
shutil.which("docker") is None,
Expand Down Expand Up @@ -332,6 +348,12 @@ def test_docker_image_build_and_read(
# If `read_from_streams` is a list, we filter the discovered streams.
streams_list = list(set(streams_list) & set(read_from_streams))

if scenario.empty_streams:
# If there are empty streams, we remove them from the list of streams to read.
streams_list = list(
set(streams_list) - set(stream.name for stream in scenario.empty_streams)
)

configured_catalog: ConfiguredAirbyteCatalog = ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
Expand Down
7 changes: 6 additions & 1 deletion airbyte_cdk/test/standard_tests/source_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ def test_basic_read(
if scenario.expected_outcome.expect_exception() and discover_result.errors:
# Failed as expected; we're done.
return
streams = discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr]

if scenario.empty_streams:
# Don't read from any streams marked as empty in the scenario.
streams = list(set(streams) - set(stream.name for stream in scenario.empty_streams))

configured_catalog = ConfiguredAirbyteCatalog(
streams=[
Expand All @@ -128,7 +133,7 @@ def test_basic_read(
sync_mode=SyncMode.full_refresh,
destination_sync_mode=DestinationSyncMode.append_dedup,
)
for stream in discover_result.catalog.catalog.streams # type: ignore [reportOptionalMemberAccess, union-attr]
for stream in streams
]
)
result = run_test_job(
Expand Down
3 changes: 1 addition & 2 deletions airbyte_cdk/utils/connector_paths.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ def _find_in_adjacent_dirs(current_dir: Path) -> Path | None:

def resolve_connector_name_and_directory(
connector_ref: str | Path | None = None,
*,
connector_directory: Path | None = None,
) -> tuple[str, Path]:
"""Resolve the connector name and directory.

Expand All @@ -104,6 +102,7 @@ def resolve_connector_name_and_directory(
FileNotFoundError: If the connector directory does not exist or cannot be found.
"""
connector_name: str | None = None
connector_directory: Path | None = None

# Resolve connector_ref to connector_name or connector_directory (if provided)
if connector_ref:
Expand Down
Loading