Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 24 additions & 14 deletions airbyte/_executors/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@

import logging
import shutil
import subprocess
from contextlib import suppress
from pathlib import Path
from typing import NoReturn

from airbyte import exceptions as exc
from airbyte._executors.base import Executor
Expand All @@ -20,15 +21,16 @@
class DockerExecutor(Executor):
def __init__(
self,
name: str | None = None,
name: str,
image_name_full: str,
*,
executable: list[str],
target_version: str | None = None,
volumes: dict[Path, str] | None = None,
) -> None:
self.executable: list[str] = executable
self.volumes: dict[Path, str] = volumes or {}
name = name or executable[0]
self.image_name_full: str = image_name_full
super().__init__(name=name, target_version=target_version)

def ensure_installation(
Expand All @@ -51,17 +53,25 @@ def ensure_installation(
connector_name=self.name,
) from e

def install(self) -> NoReturn:
raise exc.AirbyteConnectorInstallationError(
message="Connector cannot be installed because it is not managed by PyAirbyte.",
connector_name=self.name,
)

def uninstall(self) -> NoReturn:
raise exc.AirbyteConnectorInstallationError(
message="Connector cannot be uninstalled because it is not managed by PyAirbyte.",
connector_name=self.name,
)
def install(self) -> None:
"""Install the connector.

For docker images, for now this is a no-op. In the future we might
pull the Docker image in this step.
"""
pass

def uninstall(self) -> None:
"""Uninstall the connector.

For docker images, this operation runs an `docker rmi` command to remove the image.

We suppress any errors that occur during the removal process.
"""
with suppress(subprocess.CalledProcessError):
subprocess.check_output(
["docker", "rmi", self.image_name_full],
)

@property
def _cli(self) -> list[str]:
Expand Down
1 change: 1 addition & 0 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 #

return DockerExecutor(
name=name,
image_name_full=docker_image,
executable=docker_cmd,
volumes=volumes,
)
Expand Down
98 changes: 80 additions & 18 deletions airbyte/mcp/_local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
from airbyte.mcp._util import resolve_config
from airbyte.secrets.config import _get_secret_sources
from airbyte.secrets.google_gsm import GoogleGSMSecretManager
from airbyte.sources.base import Source
from airbyte.sources.registry import get_connector_metadata


if TYPE_CHECKING:
from airbyte.caches.duckdb import DuckDBCache
from airbyte.sources.base import Source


CONFIG_HELP = """
Expand All @@ -44,6 +44,44 @@
"""


def _get_mcp_source(
connector_name: str,
override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
) -> Source:
"""Get the MCP source for a connector."""
if override_execution_mode == "auto" and is_docker_installed():
override_execution_mode = "docker"

source: Source
if override_execution_mode == "python":
source = get_source(
connector_name,
use_python=True,
install_if_missing=False,
)
elif override_execution_mode == "docker":
source = get_source(
connector_name,
docker_image=True,
install_if_missing=False,
)
elif override_execution_mode == "yaml":
source = get_source(
connector_name,
source_manifest=True,
install_if_missing=False,
)
else:
raise ValueError(
f"Unknown execution method: {override_execution_mode}. "
"Expected one of: ['auto', 'docker', 'python', 'yaml']."
)

# Ensure installed:
source.executor.ensure_installation()
return source


# @app.tool() # << deferred
def validate_connector_config(
connector_name: Annotated[
Expand All @@ -58,15 +96,19 @@ def validate_connector_config(
str | None,
Field(description="The name of the secret containing the configuration."),
] = None,
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(description="Optionally override the execution method to use for the connector."),
] = "auto",
) -> tuple[bool, str]:
"""Validate a connector configuration.

Returns a tuple of (is_valid: bool, message: str).
"""
try:
source = get_source(
source: Source = _get_mcp_source(
connector_name,
docker_image=is_docker_installed() or False,
override_execution_mode=override_execution_mode,
)
except Exception as ex:
return False, f"Failed to get connector '{connector_name}': {ex}"
Expand Down Expand Up @@ -129,14 +171,18 @@ def list_source_streams(
str | None,
Field(description="The name of the secret containing the configuration."),
] = None,
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(description="Optionally override the execution method to use for the connector."),
] = "auto",
) -> list[str]:
"""List all streams available in a source connector.

This operation (generally) requires a valid configuration, including any required secrets.
"""
source: Source = get_source(
source_connector_name,
docker_image=is_docker_installed() or False,
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -165,11 +211,15 @@ def get_source_stream_json_schema(
str | None,
Field(description="The name of the secret containing the configuration."),
],
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(description="Optionally override the execution method to use for the connector."),
] = "auto",
) -> dict[str, Any]:
"""List all properties for a specific stream in a source connector."""
source: Source = get_source(
source_connector_name,
docker_image=is_docker_installed() or False,
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -203,12 +253,16 @@ def read_source_stream_records(
int,
Field(description="The maximum number of records to read."),
] = 1000,
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(description="Optionally override the execution method to use for the connector."),
] = "auto",
) -> list[dict[str, Any]] | str:
"""Get records from a source connector."""
try:
source = get_source(
source_connector_name,
docker_image=is_docker_installed() or False,
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -259,16 +313,20 @@ def get_stream_previews(
int,
Field(description="The maximum number of sample records to return per stream."),
] = 10,
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(description="Optionally override the execution method to use for the connector."),
] = "auto",
) -> dict[str, list[dict[str, Any]] | str]:
"""Get sample records (previews) from streams in a source connector.

This operation requires a valid configuration, including any required secrets.
Returns a dictionary mapping stream names to lists of sample records, or an error
message string if an error occurred for that stream.
"""
source: Source = get_source(
source_name,
docker_image=is_docker_installed() or False,
source: Source = _get_mcp_source(
connector_name=source_name,
override_execution_mode=override_execution_mode,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -328,11 +386,15 @@ def sync_source_to_cache(
list[str] | str,
Field(description="The streams to sync."),
] = "suggested",
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(description="Optionally override the execution method to use for the connector."),
] = "auto",
) -> str:
"""Run a sync from a source connector to the default DuckDB cache."""
source = get_source(
source_connector_name,
docker_image=is_docker_installed() or False,
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
)
config_dict = resolve_config(
config=config,
Expand Down
Loading