diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index a6ae76f2..de991721 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -933,3 +933,488 @@ def check_connector( "response": json_result, }, ) + + +def validate_yaml_manifest( + manifest: Any, # noqa: ANN401 + *, + raise_on_error: bool = True, +) -> tuple[bool, str | None]: + """Validate a YAML connector manifest structure. + + Performs basic client-side validation before sending to API. + + Args: + manifest: The manifest to validate (should be a dictionary). + raise_on_error: Whether to raise an exception on validation failure. + + Returns: + Tuple of (is_valid, error_message) + """ + if not isinstance(manifest, dict): + error = "Manifest must be a dictionary" + if raise_on_error: + raise PyAirbyteInputError(message=error, context={"manifest": manifest}) + return False, error + + required_fields = ["version", "type"] + missing = [f for f in required_fields if f not in manifest] + if missing: + error = f"Manifest missing required fields: {', '.join(missing)}" + if raise_on_error: + raise PyAirbyteInputError(message=error, context={"manifest": manifest}) + return False, error + + if manifest.get("type") != "DeclarativeSource": + error = f"Manifest type must be 'DeclarativeSource', got '{manifest.get('type')}'" + if raise_on_error: + raise PyAirbyteInputError(message=error, context={"manifest": manifest}) + return False, error + + return True, None + + +def create_custom_yaml_source_definition( + name: str, + *, + workspace_id: str, + manifest: dict[str, Any], + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> models.DeclarativeSourceDefinitionResponse: + """Create a custom YAML source definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request_body = models.CreateDeclarativeSourceDefinitionRequest( + name=name, + manifest=manifest, + ) + request = api.CreateDeclarativeSourceDefinitionRequest( + workspace_id=workspace_id, + create_declarative_source_definition_request=request_body, + ) + response = airbyte_instance.declarative_source_definitions.create_declarative_source_definition( + request + ) + if response.declarative_source_definition_response is None: + raise AirbyteError( + message="Failed to create custom YAML source definition", + context={"name": name, "workspace_id": workspace_id}, + ) + return response.declarative_source_definition_response + + +def list_custom_yaml_source_definitions( + workspace_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> list[models.DeclarativeSourceDefinitionResponse]: + """List all custom YAML source definitions in a workspace.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request = api.ListDeclarativeSourceDefinitionsRequest( + workspace_id=workspace_id, + ) + response = airbyte_instance.declarative_source_definitions.list_declarative_source_definitions( + request + ) + if response.declarative_source_definitions_response is None: + raise AirbyteError( + message="Failed to list custom YAML source definitions", + context={"workspace_id": workspace_id}, + ) + return response.declarative_source_definitions_response.data + + +def get_custom_yaml_source_definition( + workspace_id: str, + definition_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> models.DeclarativeSourceDefinitionResponse: + """Get a specific custom YAML source definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request = api.GetDeclarativeSourceDefinitionRequest( + workspace_id=workspace_id, + definition_id=definition_id, + ) + response = airbyte_instance.declarative_source_definitions.get_declarative_source_definition( + request + ) + if response.declarative_source_definition_response is None: + raise AirbyteError( + message="Failed to get custom YAML source definition", + context={"workspace_id": workspace_id, "definition_id": definition_id}, + ) + return response.declarative_source_definition_response + + +def update_custom_yaml_source_definition( + workspace_id: str, + definition_id: str, + *, + manifest: dict[str, Any], + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> models.DeclarativeSourceDefinitionResponse: + """Update a custom YAML source definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request_body = models.UpdateDeclarativeSourceDefinitionRequest( + manifest=manifest, + ) + request = api.UpdateDeclarativeSourceDefinitionRequest( + workspace_id=workspace_id, + definition_id=definition_id, + update_declarative_source_definition_request=request_body, + ) + response = airbyte_instance.declarative_source_definitions.update_declarative_source_definition( + request + ) + if response.declarative_source_definition_response is None: + raise AirbyteError( + message="Failed to update custom YAML source definition", + context={"workspace_id": workspace_id, "definition_id": definition_id}, + ) + return response.declarative_source_definition_response + + +def delete_custom_yaml_source_definition( + workspace_id: str, + definition_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> None: + """Delete a custom YAML source definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request = api.DeleteDeclarativeSourceDefinitionRequest( + workspace_id=workspace_id, + definition_id=definition_id, + ) + airbyte_instance.declarative_source_definitions.delete_declarative_source_definition(request) + + +def create_custom_docker_source_definition( + name: str, + docker_repository: str, + docker_image_tag: str, + *, + workspace_id: str, + documentation_url: str | None = None, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> models.DefinitionResponse: + """Create a custom Docker source definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request_body = models.CreateDefinitionRequest( + name=name, + docker_repository=docker_repository, + docker_image_tag=docker_image_tag, + documentation_url=documentation_url, + ) + request = api.CreateSourceDefinitionRequest( + workspace_id=workspace_id, + create_definition_request=request_body, + ) + response = airbyte_instance.source_definitions.create_source_definition(request) + if response.definition_response is None: + raise AirbyteError( + message="Failed to create custom Docker source definition", + context={"name": name, "workspace_id": workspace_id}, + ) + return response.definition_response + + +def list_custom_docker_source_definitions( + workspace_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> list[models.DefinitionResponse]: + """List all custom Docker source definitions in a workspace.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request = api.ListSourceDefinitionsRequest( + workspace_id=workspace_id, + ) + response = airbyte_instance.source_definitions.list_source_definitions(request) + if response.definitions_response is None: + raise AirbyteError( + message="Failed to list custom Docker source definitions", + context={"workspace_id": workspace_id}, + ) + return response.definitions_response.data + + +def get_custom_docker_source_definition( + workspace_id: str, + definition_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> models.DefinitionResponse: + """Get a specific custom Docker source definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request = api.GetSourceDefinitionRequest( + workspace_id=workspace_id, + definition_id=definition_id, + ) + response = airbyte_instance.source_definitions.get_source_definition(request) + if response.definition_response is None: + raise AirbyteError( + message="Failed to get custom Docker source definition", + context={"workspace_id": workspace_id, "definition_id": definition_id}, + ) + return response.definition_response + + +def update_custom_docker_source_definition( + workspace_id: str, + definition_id: str, + *, + name: str, + docker_image_tag: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> models.DefinitionResponse: + """Update a custom Docker source definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request_body = models.UpdateDefinitionRequest( + name=name, + docker_image_tag=docker_image_tag, + ) + request = api.UpdateSourceDefinitionRequest( + workspace_id=workspace_id, + definition_id=definition_id, + update_definition_request=request_body, + ) + response = airbyte_instance.source_definitions.update_source_definition(request) + if response.definition_response is None: + raise AirbyteError( + message="Failed to update custom Docker source definition", + context={"workspace_id": workspace_id, "definition_id": definition_id}, + ) + return response.definition_response + + +def delete_custom_docker_source_definition( + workspace_id: str, + definition_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> None: + """Delete a custom Docker source definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request = api.DeleteSourceDefinitionRequest( + workspace_id=workspace_id, + definition_id=definition_id, + ) + airbyte_instance.source_definitions.delete_source_definition(request) + + +def create_custom_docker_destination_definition( + name: str, + docker_repository: str, + docker_image_tag: str, + *, + workspace_id: str, + documentation_url: str | None = None, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> models.DefinitionResponse: + """Create a custom Docker destination definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request_body = models.CreateDefinitionRequest( + name=name, + docker_repository=docker_repository, + docker_image_tag=docker_image_tag, + documentation_url=documentation_url, + ) + request = api.CreateDestinationDefinitionRequest( + workspace_id=workspace_id, + create_definition_request=request_body, + ) + response = airbyte_instance.destination_definitions.create_destination_definition(request) + if response.definition_response is None: + raise AirbyteError( + message="Failed to create custom Docker destination definition", + context={"name": name, "workspace_id": workspace_id}, + ) + return response.definition_response + + +def list_custom_docker_destination_definitions( + workspace_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> list[models.DefinitionResponse]: + """List all custom Docker destination definitions in a workspace.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request = api.ListDestinationDefinitionsRequest( + workspace_id=workspace_id, + ) + response = airbyte_instance.destination_definitions.list_destination_definitions(request) + if response.definitions_response is None: + raise AirbyteError( + message="Failed to list custom Docker destination definitions", + context={"workspace_id": workspace_id}, + ) + return response.definitions_response.data + + +def get_custom_docker_destination_definition( + workspace_id: str, + definition_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> models.DefinitionResponse: + """Get a specific custom Docker destination definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request = api.GetDestinationDefinitionRequest( + workspace_id=workspace_id, + definition_id=definition_id, + ) + response = airbyte_instance.destination_definitions.get_destination_definition(request) + if response.definition_response is None: + raise AirbyteError( + message="Failed to get custom Docker destination definition", + context={"workspace_id": workspace_id, "definition_id": definition_id}, + ) + return response.definition_response + + +def update_custom_docker_destination_definition( + workspace_id: str, + definition_id: str, + *, + name: str, + docker_image_tag: str, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> models.DefinitionResponse: + """Update a custom Docker destination definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request_body = models.UpdateDefinitionRequest( + name=name, + docker_image_tag=docker_image_tag, + ) + request = api.UpdateDestinationDefinitionRequest( + workspace_id=workspace_id, + definition_id=definition_id, + update_definition_request=request_body, + ) + response = airbyte_instance.destination_definitions.update_destination_definition(request) + if response.definition_response is None: + raise AirbyteError( + message="Failed to update custom Docker destination definition", + context={"workspace_id": workspace_id, "definition_id": definition_id}, + ) + return response.definition_response + + +def delete_custom_docker_destination_definition( + workspace_id: str, + definition_id: str, + *, + api_root: str, + client_id: SecretString, + client_secret: SecretString, +) -> None: + """Delete a custom Docker destination definition.""" + airbyte_instance = get_airbyte_server_instance( + api_root=api_root, + client_id=client_id, + client_secret=client_secret, + ) + + request = api.DeleteDestinationDefinitionRequest( + workspace_id=workspace_id, + definition_id=definition_id, + ) + airbyte_instance.destination_definitions.delete_destination_definition(request) diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index 658ce7a6..6e574aa5 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -41,10 +41,13 @@ import abc from dataclasses import dataclass -from typing import TYPE_CHECKING, ClassVar, Literal +from pathlib import Path +from typing import TYPE_CHECKING, Any, ClassVar, Literal +import yaml from airbyte_api import models as api_models # noqa: TC002 +from airbyte import exceptions as exc from airbyte._util import api_util @@ -253,3 +256,378 @@ def _from_destination_response( ) result._connector_info = destination_response # noqa: SLF001 # Accessing Non-Public API return result + + +class CloudCustomSourceDefinition: + """A custom source connector definition in Airbyte Cloud. + + This represents either a YAML (declarative) or Docker-based custom source definition. + """ + + def __init__( + self, + workspace: CloudWorkspace, + definition_id: str, + connector_type: Literal["yaml", "docker"], + ) -> None: + """Initialize a custom source definition object.""" + self.workspace = workspace + self.definition_id = definition_id + self.connector_type = connector_type + self._definition_info: ( + api_models.DeclarativeSourceDefinitionResponse | api_models.DefinitionResponse | None + ) = None + + def _fetch_definition_info( + self, + ) -> api_models.DeclarativeSourceDefinitionResponse | api_models.DefinitionResponse: + """Fetch definition info from the API.""" + if self.connector_type == "yaml": + return api_util.get_custom_yaml_source_definition( + workspace_id=self.workspace.workspace_id, + definition_id=self.definition_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + return api_util.get_custom_docker_source_definition( + workspace_id=self.workspace.workspace_id, + definition_id=self.definition_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + @property + def name(self) -> str: + """Get the display name of the custom connector definition.""" + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.name + + @property + def manifest(self) -> dict[str, Any] | None: + """Get the Low-code CDK manifest. Only present for YAML connectors.""" + if self.connector_type != "yaml": + return None + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.manifest + + @property + def version(self) -> str | None: + """Get the manifest version. Only present for YAML connectors.""" + if self.connector_type != "yaml": + return None + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.version + + @property + def docker_repository(self) -> str | None: + """Get the Docker repository. Only present for Docker connectors.""" + if self.connector_type != "docker": + return None + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.docker_repository + + @property + def docker_image_tag(self) -> str | None: + """Get the Docker image tag. Only present for Docker connectors.""" + if self.connector_type != "docker": + return None + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.docker_image_tag + + @property + def documentation_url(self) -> str | None: + """Get the documentation URL. Only present for Docker connectors.""" + if self.connector_type != "docker": + return None + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.documentation_url + + @property + def definition_url(self) -> str: + """Get the web URL of the custom source definition.""" + return ( + f"{self.workspace.workspace_url}/settings/custom-connectors/" + f"sources/{self.definition_id}" + ) + + def permanently_delete(self) -> None: + """Permanently delete this custom source definition.""" + self.workspace.permanently_delete_custom_source_definition( + self.definition_id, + custom_connector_type=self.connector_type, + ) + + def update_definition( + self, + *, + manifest_yaml: dict[str, Any] | Path | str | None = None, + docker_tag: str | None = None, + pre_validate: bool = True, + ) -> CloudCustomSourceDefinition: + """Update this custom source definition. + + You must specify EXACTLY ONE of manifest_yaml (for YAML connectors) OR + docker_tag (for Docker connectors), but not both. + + For YAML connectors: updates the manifest + For Docker connectors: updates the image tag (name remains unchanged - use + rename() to change the name) + + Args: + manifest_yaml: New manifest (YAML connectors only) + docker_tag: New Docker tag (Docker connectors only) + pre_validate: Whether to validate manifest (YAML only) + + Returns: + Updated CloudCustomSourceDefinition object + + Raises: + PyAirbyteInputError: If both or neither parameters are provided + """ + is_yaml = manifest_yaml is not None + is_docker = docker_tag is not None + + if is_yaml == is_docker: + raise exc.PyAirbyteInputError( + message=( + "Must specify EXACTLY ONE of manifest_yaml (for YAML) OR " + "docker_tag (for Docker), but not both" + ), + context={ + "manifest_yaml_provided": is_yaml, + "docker_tag_provided": is_docker, + }, + ) + + if is_yaml: + manifest_dict: dict[str, Any] + if isinstance(manifest_yaml, Path): + manifest_dict = yaml.safe_load(manifest_yaml.read_text()) + elif isinstance(manifest_yaml, str): + manifest_dict = yaml.safe_load(manifest_yaml) + else: + manifest_dict = manifest_yaml # type: ignore[assignment] + + if pre_validate: + api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) + + result = api_util.update_custom_yaml_source_definition( + workspace_id=self.workspace.workspace_id, + definition_id=self.definition_id, + manifest=manifest_dict, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + return CloudCustomSourceDefinition._from_yaml_response(self.workspace, result) + + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + + result = api_util.update_custom_docker_source_definition( + workspace_id=self.workspace.workspace_id, + definition_id=self.definition_id, + name=self._definition_info.name, + docker_image_tag=docker_tag, # type: ignore[arg-type] + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + return CloudCustomSourceDefinition._from_docker_response(self.workspace, result) + + def rename( + self, + new_name: str, + ) -> CloudCustomSourceDefinition: + """Rename this custom source definition. + + Note: Only Docker custom sources can be renamed. YAML custom sources + cannot be renamed as their names are derived from the manifest. + + Args: + new_name: New display name for the connector + + Returns: + Updated CloudCustomSourceDefinition object + + Raises: + PyAirbyteInputError: If attempting to rename a YAML connector + """ + if self.connector_type == "yaml": + raise exc.PyAirbyteInputError( + message="Cannot rename YAML custom source definitions", + context={"definition_id": self.definition_id}, + ) + + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + + result = api_util.update_custom_docker_source_definition( + workspace_id=self.workspace.workspace_id, + definition_id=self.definition_id, + name=new_name, + docker_image_tag=self._definition_info.docker_image_tag, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + return CloudCustomSourceDefinition._from_docker_response(self.workspace, result) + + def __repr__(self) -> str: + """String representation.""" + return ( + f"CloudCustomSourceDefinition(definition_id={self.definition_id}, " + f"name={self.name}, connector_type={self.connector_type})" + ) + + @classmethod + def _from_yaml_response( + cls, + workspace: CloudWorkspace, + response: api_models.DeclarativeSourceDefinitionResponse, + ) -> CloudCustomSourceDefinition: + """Internal factory method for YAML connectors.""" + result = cls( + workspace=workspace, + definition_id=response.id, + connector_type="yaml", + ) + result._definition_info = response # noqa: SLF001 + return result + + @classmethod + def _from_docker_response( + cls, + workspace: CloudWorkspace, + response: api_models.DefinitionResponse, + ) -> CloudCustomSourceDefinition: + """Internal factory method for Docker connectors.""" + result = cls( + workspace=workspace, + definition_id=response.id, + connector_type="docker", + ) + result._definition_info = response # noqa: SLF001 + return result + + +class CloudCustomDestinationDefinition: + """A custom destination connector definition in Airbyte Cloud. + + Currently only supports Docker-based custom destinations. + """ + + def __init__( + self, + workspace: CloudWorkspace, + definition_id: str, + ) -> None: + """Initialize a custom destination definition object.""" + self.workspace = workspace + self.definition_id = definition_id + self._definition_info: api_models.DefinitionResponse | None = None + + def _fetch_definition_info(self) -> api_models.DefinitionResponse: + """Fetch definition info from the API.""" + return api_util.get_custom_docker_destination_definition( + workspace_id=self.workspace.workspace_id, + definition_id=self.definition_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + @property + def name(self) -> str: + """Get the display name of the custom connector definition.""" + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.name + + @property + def docker_repository(self) -> str: + """Get the Docker repository.""" + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.docker_repository + + @property + def docker_image_tag(self) -> str: + """Get the Docker image tag.""" + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.docker_image_tag + + @property + def documentation_url(self) -> str | None: + """Get the documentation URL.""" + if not self._definition_info: + self._definition_info = self._fetch_definition_info() + return self._definition_info.documentation_url + + @property + def definition_url(self) -> str: + """Get the web URL of the custom destination definition.""" + return ( + f"{self.workspace.workspace_url}/settings/custom-connectors/" + f"destinations/{self.definition_id}" + ) + + def permanently_delete(self) -> None: + """Permanently delete this custom destination definition.""" + self.workspace.permanently_delete_custom_destination_definition(self.definition_id) + + def update_definition( + self, + *, + name: str, + docker_tag: str, + ) -> CloudCustomDestinationDefinition: + """Update this custom destination definition. + + Args: + name: New display name + docker_tag: New Docker tag + + Returns: + Updated CloudCustomDestinationDefinition object + """ + result = api_util.update_custom_docker_destination_definition( + workspace_id=self.workspace.workspace_id, + definition_id=self.definition_id, + name=name, + docker_image_tag=docker_tag, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + return CloudCustomDestinationDefinition._from_docker_response(self.workspace, result) + + def __repr__(self) -> str: + """String representation.""" + return ( + f"CloudCustomDestinationDefinition(definition_id={self.definition_id}, " + f"name={self.name}, docker_repository={self.docker_repository})" + ) + + @classmethod + def _from_docker_response( + cls, + workspace: CloudWorkspace, + response: api_models.DefinitionResponse, + ) -> CloudCustomDestinationDefinition: + """Internal factory method.""" + result = cls( + workspace=workspace, + definition_id=response.id, + ) + result._definition_info = response # noqa: SLF001 + return result diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index 621b3d63..42ca8dff 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -36,13 +36,21 @@ from __future__ import annotations from dataclasses import dataclass -from typing import TYPE_CHECKING, Any +from pathlib import Path +from typing import TYPE_CHECKING, Any, Literal + +import yaml from airbyte import exceptions as exc from airbyte._util import api_util, text_util from airbyte._util.api_util import get_web_url_root from airbyte.cloud.connections import CloudConnection -from airbyte.cloud.connectors import CloudDestination, CloudSource +from airbyte.cloud.connectors import ( + CloudCustomDestinationDefinition, + CloudCustomSourceDefinition, + CloudDestination, + CloudSource, +) from airbyte.destinations.base import Destination from airbyte.secrets.base import SecretString @@ -53,7 +61,7 @@ from airbyte.sources.base import Source -@dataclass +@dataclass # noqa: PLR0904 class CloudWorkspace: """A remote workspace on the Airbyte Cloud. @@ -450,3 +458,318 @@ def list_destinations( for destination in destinations if name is None or destination.name == name ] + + def publish_custom_source_definition( + self, + name: str, + *, + manifest_yaml: dict[str, Any] | Path | str | None = None, + docker_image: str | None = None, + docker_tag: str | None = None, + documentation_url: str | None = None, + unique: bool = True, + pre_validate: bool = True, + ) -> CloudCustomSourceDefinition: + """Publish a custom source connector definition. + + You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image + and docker_tag (for Docker connectors), but not both. + + Args: + name: Display name for the connector definition + manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) + docker_image: Docker repository (e.g., 'airbyte/source-custom') + docker_tag: Docker image tag (e.g., '1.0.0') + documentation_url: Optional URL to connector documentation (Docker only) + unique: Whether to enforce name uniqueness + pre_validate: Whether to validate manifest client-side (YAML only) + + Returns: + CloudCustomSourceDefinition object representing the created definition + + Raises: + PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided + AirbyteDuplicateResourcesError: If unique=True and name already exists + """ + is_yaml = manifest_yaml is not None + is_docker = docker_image is not None + + if is_yaml == is_docker: + raise exc.PyAirbyteInputError( + message=( + "Must specify EITHER manifest_yaml (for YAML connectors) OR " + "docker_image + docker_tag (for Docker connectors), but not both" + ), + context={ + "manifest_yaml_provided": is_yaml, + "docker_image_provided": is_docker, + }, + ) + + if is_docker and docker_tag is None: + raise exc.PyAirbyteInputError( + message="docker_tag is required when docker_image is specified", + context={"docker_image": docker_image}, + ) + + if unique: + existing = self.list_custom_source_definitions( + name=name, + custom_connector_type="yaml" if is_yaml else "docker", + ) + if existing: + raise exc.AirbyteDuplicateResourcesError( + resource_type="custom_source_definition", + resource_name=name, + ) + + if is_yaml: + manifest_dict: dict[str, Any] + if isinstance(manifest_yaml, Path): + manifest_dict = yaml.safe_load(manifest_yaml.read_text()) + elif isinstance(manifest_yaml, str): + manifest_dict = yaml.safe_load(manifest_yaml) + elif manifest_yaml is not None: + manifest_dict = manifest_yaml + else: + raise exc.PyAirbyteInputError( + message="manifest_yaml is required for YAML connectors", + context={"name": name}, + ) + + if pre_validate: + api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) + + result = api_util.create_custom_yaml_source_definition( + name=name, + workspace_id=self.workspace_id, + manifest=manifest_dict, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + return CloudCustomSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 + result = api_util.create_custom_docker_source_definition( + name=name, + docker_repository=docker_image, # type: ignore[arg-type] + docker_image_tag=docker_tag, # type: ignore[arg-type] + workspace_id=self.workspace_id, + documentation_url=documentation_url, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + return CloudCustomSourceDefinition._from_docker_response(self, result) # noqa: SLF001 + + def list_custom_source_definitions( + self, + *, + name: str | None = None, + custom_connector_type: Literal["yaml", "docker"], + ) -> list[CloudCustomSourceDefinition]: + """List custom source connector definitions. + + Args: + name: Filter by exact name match + custom_connector_type: Connector type to list ("yaml" or "docker"). Required. + + Returns: + List of CloudCustomSourceDefinition objects matching the specified type + """ + result: list[CloudCustomSourceDefinition] = [] + + if custom_connector_type == "yaml": + yaml_definitions = api_util.list_custom_yaml_source_definitions( + workspace_id=self.workspace_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + result.extend( + CloudCustomSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 + for d in yaml_definitions + if name is None or d.name == name + ) + elif custom_connector_type == "docker": + docker_definitions = api_util.list_custom_docker_source_definitions( + workspace_id=self.workspace_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + result.extend( + CloudCustomSourceDefinition._from_docker_response(self, d) # noqa: SLF001 + for d in docker_definitions + if name is None or d.name == name + ) + + return result + + def get_custom_source_definition( + self, + definition_id: str, + *, + custom_connector_type: Literal["yaml", "docker"], + ) -> CloudCustomSourceDefinition: + """Get a specific custom source definition by ID. + + Args: + definition_id: The definition ID + custom_connector_type: Connector type ("yaml" or "docker"). Required. + + Returns: + CloudCustomSourceDefinition object + """ + if custom_connector_type == "yaml": + result = api_util.get_custom_yaml_source_definition( + workspace_id=self.workspace_id, + definition_id=definition_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + return CloudCustomSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 + result = api_util.get_custom_docker_source_definition( + workspace_id=self.workspace_id, + definition_id=definition_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + return CloudCustomSourceDefinition._from_docker_response(self, result) # noqa: SLF001 + + def permanently_delete_custom_source_definition( + self, + definition_id: str, + *, + custom_connector_type: Literal["yaml", "docker"], + ) -> None: + """Permanently delete a custom source definition. + + Args: + definition_id: The definition ID to delete + custom_connector_type: Connector type ("yaml" or "docker"). Required. + """ + if custom_connector_type == "yaml": + api_util.delete_custom_yaml_source_definition( + workspace_id=self.workspace_id, + definition_id=definition_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + else: + api_util.delete_custom_docker_source_definition( + workspace_id=self.workspace_id, + definition_id=definition_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + + def publish_custom_destination_definition( + self, + name: str, + *, + docker_image: str, + docker_tag: str, + documentation_url: str | None = None, + unique: bool = True, + ) -> CloudCustomDestinationDefinition: + """Publish a custom destination connector definition. + + Currently only Docker-based destinations are supported. + + Args: + name: Display name for the connector definition + docker_image: Docker repository (e.g., 'airbyte/destination-custom') + docker_tag: Docker image tag (e.g., '1.0.0') + documentation_url: Optional URL to connector documentation + unique: Whether to enforce name uniqueness + + Returns: + CloudCustomDestinationDefinition object representing the created definition + """ + if unique: + existing = self.list_custom_destination_definitions(name=name) + if existing: + raise exc.AirbyteDuplicateResourcesError( + resource_type="custom_destination_definition", + resource_name=name, + ) + + result = api_util.create_custom_docker_destination_definition( + name=name, + docker_repository=docker_image, + docker_image_tag=docker_tag, + workspace_id=self.workspace_id, + documentation_url=documentation_url, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + return CloudCustomDestinationDefinition._from_docker_response(self, result) # noqa: SLF001 + + def list_custom_destination_definitions( + self, + *, + name: str | None = None, + ) -> list[CloudCustomDestinationDefinition]: + """List custom destination connector definitions. + + Args: + name: Filter by exact name match + + Returns: + List of CloudCustomDestinationDefinition objects + """ + definitions = api_util.list_custom_docker_destination_definitions( + workspace_id=self.workspace_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + + return [ + CloudCustomDestinationDefinition._from_docker_response(self, d) # noqa: SLF001 + for d in definitions + if name is None or d.name == name + ] + + def get_custom_destination_definition( + self, + definition_id: str, + ) -> CloudCustomDestinationDefinition: + """Get a specific custom destination definition by ID. + + Args: + definition_id: The definition ID + + Returns: + CloudCustomDestinationDefinition object + """ + result = api_util.get_custom_docker_destination_definition( + workspace_id=self.workspace_id, + definition_id=definition_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) + return CloudCustomDestinationDefinition._from_docker_response(self, result) # noqa: SLF001 + + def permanently_delete_custom_destination_definition( + self, + definition_id: str, + ) -> None: + """Permanently delete a custom destination definition. + + Args: + definition_id: The definition ID to delete + """ + api_util.delete_custom_docker_destination_definition( + workspace_id=self.workspace_id, + definition_id=definition_id, + api_root=self.api_root, + client_id=self.client_id, + client_secret=self.client_secret, + ) diff --git a/airbyte/mcp/cloud_ops.py b/airbyte/mcp/cloud_ops.py index ecb43d4b..f194ebb5 100644 --- a/airbyte/mcp/cloud_ops.py +++ b/airbyte/mcp/cloud_ops.py @@ -1,6 +1,7 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. """Airbyte Cloud MCP operations.""" +from pathlib import Path from typing import Annotated, Any from fastmcp import FastMCP @@ -501,6 +502,333 @@ def list_deployed_cloud_connections() -> list[CloudConnection]: return workspace.list_connections() +def publish_custom_source_definition( + name: Annotated[ + str, + Field(description="The name for the custom connector definition."), + ], + *, + manifest_yaml: Annotated[ + dict | str | None, + Field( + description=( + "The Low-code CDK manifest as a dict or YAML string. " + "Required for YAML connectors. Mutually exclusive with docker_image." + ), + default=None, + ), + ] = None, + docker_image: Annotated[ + str | None, + Field( + description=( + "Docker repository (e.g., 'airbyte/source-custom'). " + "Required for Docker connectors." + ), + default=None, + ), + ] = None, + docker_tag: Annotated[ + str | None, + Field( + description=( + "Docker image tag (e.g., '1.0.0'). " "Required when docker_image is specified." + ), + default=None, + ), + ] = None, + documentation_url: Annotated[ + str | None, + Field( + description="Optional URL to connector documentation (Docker only).", + default=None, + ), + ] = None, + unique: Annotated[ + bool, + Field( + description="Whether to require a unique name.", + default=True, + ), + ] = True, + pre_validate: Annotated[ + bool, + Field( + description=( + "Whether to validate the manifest client-side " "before publishing (YAML only)." + ), + default=True, + ), + ] = True, +) -> str: + """Publish a custom source connector definition to Airbyte Cloud. + + Supports both YAML (declarative) and Docker-based custom source definitions. + You must specify EITHER manifest_yaml OR docker_image + docker_tag, but not both. + """ + try: + workspace: CloudWorkspace = _get_cloud_workspace() + result = workspace.publish_custom_source_definition( + name=name, + manifest_yaml=manifest_yaml, + docker_image=docker_image, + docker_tag=docker_tag, + documentation_url=documentation_url, + unique=unique, + pre_validate=pre_validate, + ) + except Exception as ex: + return f"Failed to publish custom source definition '{name}': {ex}" + else: + if result.connector_type == "yaml": + return ( + f"Successfully published custom YAML source definition '{name}' " + f"with ID '{result.definition_id}' (version {result.version or 'N/A'})" + ) + return ( + f"Successfully published custom Docker source definition '{name}' " + f"with ID '{result.definition_id}' " + f"({result.docker_repository}:{result.docker_image_tag})" + ) + + +def list_custom_source_definitions( + custom_connector_type: Annotated[ + str, + Field( + description="Connector type to list: 'yaml' or 'docker'. Required.", + ), + ], +) -> list[dict[str, Any]]: + """List custom source definitions in the Airbyte Cloud workspace. + + You must specify the connector type to list - either 'yaml' or 'docker'. + """ + workspace: CloudWorkspace = _get_cloud_workspace() + definitions = workspace.list_custom_source_definitions( + custom_connector_type=custom_connector_type, # type: ignore[arg-type] + ) + + return [ + { + "definition_id": d.definition_id, + "name": d.name, + "connector_type": d.connector_type, + "manifest": d.manifest, + "version": d.version, + "docker_repository": d.docker_repository, + "docker_image_tag": d.docker_image_tag, + "documentation_url": d.documentation_url, + } + for d in definitions + ] + + +def update_custom_source_definition( + definition_id: Annotated[ + str, + Field(description="The ID of the definition to update."), + ], + *, + manifest_yaml: Annotated[ + str | Path | None, + Field( + description="New manifest as YAML string or file path (YAML connectors only).", + default=None, + ), + ] = None, + docker_tag: Annotated[ + str | None, + Field( + description="New Docker image tag (Docker connectors only).", + default=None, + ), + ] = None, + custom_connector_type: Annotated[ + str, + Field( + description="Connector type: 'yaml' or 'docker'. Required.", + ), + ], + pre_validate: Annotated[ + bool, + Field( + description="Whether to validate the manifest client-side before updating (YAML only).", + default=True, + ), + ] = True, +) -> str: + """Update a custom source definition in Airbyte Cloud. + + You must specify EXACTLY ONE of manifest_yaml or docker_tag, but not both. + For YAML connectors: specify manifest_yaml + For Docker connectors: specify docker_tag (updates tag only, not name) + To rename a Docker connector, use rename_custom_source_definition instead. + """ + try: + workspace: CloudWorkspace = _get_cloud_workspace() + definition = workspace.get_custom_source_definition( + definition_id=definition_id, + custom_connector_type=custom_connector_type, # type: ignore[arg-type] + ) + result = definition.update_definition( + manifest_yaml=manifest_yaml, + docker_tag=docker_tag, + pre_validate=pre_validate, + ) + except Exception as ex: + return f"Failed to update custom source definition '{definition_id}': {ex}" + else: + if result.connector_type == "yaml": + return ( + f"Successfully updated custom YAML source definition. " + f"Name: {result.name}, version: {result.version or 'N/A'}" + ) + return ( + f"Successfully updated custom Docker source definition. " + f"Name: {result.name}, tag: {result.docker_image_tag}" + ) + + +def rename_custom_source_definition( + definition_id: Annotated[ + str, + Field(description="The ID of the definition to rename."), + ], + new_name: Annotated[ + str, + Field(description="New display name for the connector."), + ], + custom_connector_type: Annotated[ + str, + Field( + description=( + "Connector type: 'yaml' or 'docker'. " "Only Docker connectors can be renamed." + ), + ), + ], +) -> str: + """Rename a custom source definition in Airbyte Cloud. + + Note: Only Docker custom sources can be renamed. YAML custom sources + cannot be renamed as their names are derived from the manifest. + """ + try: + workspace: CloudWorkspace = _get_cloud_workspace() + definition = workspace.get_custom_source_definition( + definition_id=definition_id, + custom_connector_type=custom_connector_type, # type: ignore[arg-type] + ) + result = definition.rename(new_name=new_name) + except Exception as ex: + return f"Failed to rename custom source definition '{definition_id}': {ex}" + else: + return ( + f"Successfully renamed custom Docker source definition to '{result.name}' " + f"(ID: {result.definition_id})" + ) + + +def publish_custom_destination_definition( + name: Annotated[ + str, + Field(description="The name for the custom connector definition."), + ], + docker_image: Annotated[ + str, + Field(description="Docker repository (e.g., 'airbyte/destination-custom')."), + ], + docker_tag: Annotated[ + str, + Field(description="Docker image tag (e.g., '1.0.0')."), + ], + *, + documentation_url: Annotated[ + str | None, + Field( + description="Optional URL to connector documentation.", + default=None, + ), + ] = None, + unique: Annotated[ + bool, + Field( + description="Whether to require a unique name.", + default=True, + ), + ] = True, +) -> str: + """Publish a custom destination connector definition to Airbyte Cloud. + + Currently only Docker-based custom destinations are supported. + """ + try: + workspace: CloudWorkspace = _get_cloud_workspace() + result = workspace.publish_custom_destination_definition( + name=name, + docker_image=docker_image, + docker_tag=docker_tag, + documentation_url=documentation_url, + unique=unique, + ) + except Exception as ex: + return f"Failed to publish custom destination definition '{name}': {ex}" + else: + return ( + f"Successfully published custom Docker destination definition '{name}' " + f"with ID '{result.definition_id}' " + f"({result.docker_repository}:{result.docker_image_tag})" + ) + + +def list_custom_destination_definitions() -> list[dict[str, Any]]: + """List all custom destination definitions in the Airbyte Cloud workspace.""" + workspace: CloudWorkspace = _get_cloud_workspace() + definitions = workspace.list_custom_destination_definitions() + + return [ + { + "definition_id": d.definition_id, + "name": d.name, + "docker_repository": d.docker_repository, + "docker_image_tag": d.docker_image_tag, + "documentation_url": d.documentation_url, + } + for d in definitions + ] + + +def update_custom_destination_definition( + definition_id: Annotated[ + str, + Field(description="The ID of the definition to update."), + ], + name: Annotated[ + str, + Field(description="New name for the definition."), + ], + docker_tag: Annotated[ + str, + Field(description="New Docker image tag."), + ], +) -> str: + """Update a custom destination definition in Airbyte Cloud.""" + try: + workspace: CloudWorkspace = _get_cloud_workspace() + definition = workspace.get_custom_destination_definition(definition_id=definition_id) + result = definition.update_definition( + name=name, + docker_tag=docker_tag, + ) + except Exception as ex: + return f"Failed to update custom destination definition '{definition_id}': {ex}" + else: + return ( + f"Successfully updated custom Docker destination definition. " + f"Name: {result.name}, tag: {result.docker_image_tag}" + ) + + def register_cloud_ops_tools(app: FastMCP) -> None: """@private Register tools with the FastMCP app. @@ -517,3 +845,10 @@ def register_cloud_ops_tools(app: FastMCP) -> None: app.tool(list_deployed_cloud_source_connectors) app.tool(list_deployed_cloud_destination_connectors) app.tool(list_deployed_cloud_connections) + app.tool(publish_custom_source_definition) + app.tool(list_custom_source_definitions) + app.tool(update_custom_source_definition) + app.tool(rename_custom_source_definition) + app.tool(publish_custom_destination_definition) + app.tool(list_custom_destination_definitions) + app.tool(update_custom_destination_definition) diff --git a/tests/integration_tests/cloud/test_custom_definitions.py b/tests/integration_tests/cloud/test_custom_definitions.py new file mode 100644 index 00000000..d2f4092a --- /dev/null +++ b/tests/integration_tests/cloud/test_custom_definitions.py @@ -0,0 +1,221 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Tests for custom connector definition publishing.""" + +import pytest + +from airbyte.cloud.workspaces import CloudWorkspace + + +TEST_YAML_MANIFEST = { + "version": "0.1.0", + "type": "DeclarativeSource", + "check": { + "type": "CheckStream", + "stream_names": ["test_stream"], + }, + "definitions": { + "base_requester": { + "type": "HttpRequester", + "url_base": "https://httpbin.org", + }, + }, + "streams": [ + { + "type": "DeclarativeStream", + "name": "test_stream", + "primary_key": ["id"], + "retriever": { + "type": "SimpleRetriever", + "requester": {"$ref": "#/definitions/base_requester", "path": "/get"}, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + }, + } + ], + "spec": { + "type": "Spec", + "connection_specification": { + "type": "object", + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": {}, + }, + }, +} + + +@pytest.mark.requires_creds +def test_publish_custom_yaml_source( + cloud_workspace: CloudWorkspace, +) -> None: + """Test publishing a custom YAML source definition.""" + from airbyte._util import text_util + + name = f"test-yaml-source-{text_util.generate_random_suffix()}" + + result = cloud_workspace.publish_custom_source_definition( + name=name, + manifest_yaml=TEST_YAML_MANIFEST, + unique=True, + pre_validate=True, + ) + + assert result.definition_id + assert result.name == name + assert result.manifest is not None + assert result.version is not None + assert result.connector_type == "yaml" + + definition_id = result.definition_id + + try: + definitions = cloud_workspace.list_custom_source_definitions( + name=name, + custom_connector_type="yaml", + ) + assert len(definitions) == 1 + assert definitions[0].definition_id == definition_id + + fetched = cloud_workspace.get_custom_source_definition( + definition_id, + custom_connector_type="yaml", + ) + assert fetched.definition_id == definition_id + assert fetched.name == name + assert fetched.connector_type == "yaml" + + updated_manifest = TEST_YAML_MANIFEST.copy() + updated_manifest["version"] = "0.2.0" + updated = fetched.update_definition( + manifest_yaml=updated_manifest, + ) + assert updated.manifest["version"] == "0.2.0" + + finally: + cloud_workspace.permanently_delete_custom_source_definition( + definition_id, + custom_connector_type="yaml", + ) + + +@pytest.mark.skip( + reason="Docker custom definitions appear blocked in Airbyte Cloud - pending confirmation" +) +@pytest.mark.requires_creds +def test_publish_custom_docker_source( + cloud_workspace: CloudWorkspace, +) -> None: + """Test publishing a custom Docker source definition.""" + from airbyte._util import text_util + + name = f"test-docker-source-{text_util.generate_random_suffix()}" + + result = cloud_workspace.publish_custom_source_definition( + name=name, + docker_image="airbyte/test-source", + docker_tag="1.0.0", + documentation_url="https://example.com/docs", + unique=True, + ) + + assert result.definition_id + assert result.name == name + assert result.docker_repository == "airbyte/test-source" + assert result.docker_image_tag == "1.0.0" + assert result.connector_type == "docker" + + definition_id = result.definition_id + + try: + definitions = cloud_workspace.list_custom_source_definitions( + name=name, + custom_connector_type="docker", + ) + assert len(definitions) == 1 + assert definitions[0].definition_id == definition_id + + fetched = cloud_workspace.get_custom_source_definition( + definition_id, + custom_connector_type="docker", + ) + assert fetched.definition_id == definition_id + assert fetched.name == name + assert fetched.connector_type == "docker" + + updated = fetched.update_definition( + docker_tag="2.0.0", + ) + assert updated.docker_image_tag == "2.0.0" + + finally: + cloud_workspace.permanently_delete_custom_source_definition( + definition_id, + custom_connector_type="docker", + ) + + +@pytest.mark.skip( + reason="Docker custom definitions appear blocked in Airbyte Cloud - pending confirmation" +) +@pytest.mark.requires_creds +def test_publish_custom_docker_destination( + cloud_workspace: CloudWorkspace, +) -> None: + """Test publishing a custom Docker destination definition.""" + from airbyte._util import text_util + + name = f"test-docker-dest-{text_util.generate_random_suffix()}" + + result = cloud_workspace.publish_custom_destination_definition( + name=name, + docker_image="airbyte/test-destination", + docker_tag="1.0.0", + unique=True, + ) + + assert result.definition_id + assert result.name == name + assert result.docker_repository == "airbyte/test-destination" + assert result.docker_image_tag == "1.0.0" + + definition_id = result.definition_id + + try: + definitions = cloud_workspace.list_custom_destination_definitions(name=name) + assert len(definitions) == 1 + assert definitions[0].definition_id == definition_id + + fetched = cloud_workspace.get_custom_destination_definition(definition_id) + assert fetched.definition_id == definition_id + assert fetched.name == name + + updated = fetched.update_definition( + name=name, + docker_tag="2.0.0", + ) + assert updated.docker_image_tag == "2.0.0" + + finally: + cloud_workspace.permanently_delete_custom_destination_definition(definition_id) + + +@pytest.mark.requires_creds +def test_yaml_validation_error( + cloud_workspace: CloudWorkspace, +) -> None: + """Test that validation catches invalid manifests.""" + from airbyte._util import text_util + from airbyte.exceptions import PyAirbyteInputError + + name = f"test-invalid-{text_util.generate_random_suffix()}" + invalid_manifest = {"version": "0.1.0"} + + with pytest.raises(PyAirbyteInputError) as exc_info: + cloud_workspace.publish_custom_source_definition( + name=name, + manifest_yaml=invalid_manifest, + pre_validate=True, + ) + + assert "type" in str(exc_info.value).lower()