From 93da9b4acaa9a1cf65c900c50945496efa8a2b3a Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 26 Aug 2025 13:55:16 -0700 Subject: [PATCH 1/5] feat(mcp): add support for overriding execution mode (yaml, python, docker, auto) --- airbyte/_executors/docker.py | 38 ++++++++------ airbyte/_executors/util.py | 1 + airbyte/mcp/_local_ops.py | 96 +++++++++++++++++++++++++++++------- 3 files changed, 104 insertions(+), 31 deletions(-) diff --git a/airbyte/_executors/docker.py b/airbyte/_executors/docker.py index c395eaf3..a364e0de 100644 --- a/airbyte/_executors/docker.py +++ b/airbyte/_executors/docker.py @@ -3,8 +3,9 @@ import logging import shutil +import subprocess +from contextlib import suppress from pathlib import Path -from typing import NoReturn from airbyte import exceptions as exc from airbyte._executors.base import Executor @@ -20,7 +21,8 @@ class DockerExecutor(Executor): def __init__( self, - name: str | None = None, + name: str, + image_name_full: str, *, executable: list[str], target_version: str | None = None, @@ -28,7 +30,7 @@ def __init__( ) -> None: self.executable: list[str] = executable self.volumes: dict[Path, str] = volumes or {} - name = name or executable[0] + self.image_name_full: str = image_name_full super().__init__(name=name, target_version=target_version) def ensure_installation( @@ -51,17 +53,25 @@ def ensure_installation( connector_name=self.name, ) from e - def install(self) -> NoReturn: - raise exc.AirbyteConnectorInstallationError( - message="Connector cannot be installed because it is not managed by PyAirbyte.", - connector_name=self.name, - ) - - def uninstall(self) -> NoReturn: - raise exc.AirbyteConnectorInstallationError( - message="Connector cannot be uninstalled because it is not managed by PyAirbyte.", - connector_name=self.name, - ) + def install(self) -> None: + """Install the connector. + + For docker images, for now this is a no-op. In the future we might + pull the Docker image in this step. + """ + pass + + def uninstall(self) -> None: + """Uninstall the connector. + + For docker images, this operation runs an `docker rmi` command to remove the image. + + We suppress any errors that occur during the removal process. + """ + with suppress(subprocess.CalledProcessError): + subprocess.check_output( + ["docker", "rmi", self.image_name_full], + ) @property def _cli(self) -> list[str]: diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index e33a9c05..5c4d2d67 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -305,6 +305,7 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # return DockerExecutor( name=name, + image_name_full=docker_image, executable=docker_cmd, volumes=volumes, ) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index 8208f3d4..5446ad05 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -16,6 +16,7 @@ from airbyte.mcp._util import resolve_config from airbyte.secrets.config import _get_secret_sources from airbyte.secrets.google_gsm import GoogleGSMSecretManager +from airbyte.sources.base import Source from airbyte.sources.registry import get_connector_metadata @@ -44,6 +45,43 @@ """ +def _get_mcp_source( + connector_name: str, + override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", +) -> Source: + """Get the MCP source for a connector.""" + if override_execution_mode == "auto" and is_docker_installed(): + override_execution_mode = "docker" + + if override_execution_mode == "python": + source: Source = get_source( + connector_name, + use_python=True, + install_if_missing=False, + ) + elif override_execution_mode == "docker": + source: Source = get_source( + connector_name, + docker_image=True, + install_if_missing=False, + ) + elif override_execution_mode == "yaml": + source = get_source( + connector_name, + source_manifest=True, + install_if_missing=False, + ) + else: + raise ValueError( + f"Unknown execution method: {override_execution_mode}. " + "Expected one of: ['auto', 'docker', 'python', 'yaml']." + ) + + # Install if needed: + source.install() + return source + + # @app.tool() # << deferred def validate_connector_config( connector_name: Annotated[ @@ -58,15 +96,19 @@ def validate_connector_config( str | None, Field(description="The name of the secret containing the configuration."), ] = None, + override_execution_mode: Annotated[ + Literal["docker", "python", "yaml", "auto"], + Field(description="Optionally override the execution method to use for the connector."), + ] = "auto", ) -> tuple[bool, str]: """Validate a connector configuration. Returns a tuple of (is_valid: bool, message: str). """ try: - source = get_source( + source: Source = _get_mcp_source( connector_name, - docker_image=is_docker_installed() or False, + override_execution_mode=override_execution_mode, ) except Exception as ex: return False, f"Failed to get connector '{connector_name}': {ex}" @@ -129,14 +171,18 @@ def list_source_streams( str | None, Field(description="The name of the secret containing the configuration."), ] = None, + override_execution_mode: Annotated[ + Literal["docker", "python", "yaml", "auto"], + Field(description="Optionally override the execution method to use for the connector."), + ] = "auto", ) -> list[str]: """List all streams available in a source connector. This operation (generally) requires a valid configuration, including any required secrets. """ - source: Source = get_source( - source_connector_name, - docker_image=is_docker_installed() or False, + source: Source = _get_mcp_source( + connector_name=source_connector_name, + override_execution_mode=override_execution_mode, ) config_dict = resolve_config( config=config, @@ -165,11 +211,15 @@ def get_source_stream_json_schema( str | None, Field(description="The name of the secret containing the configuration."), ], + override_execution_mode: Annotated[ + Literal["docker", "python", "yaml", "auto"], + Field(description="Optionally override the execution method to use for the connector."), + ] = "auto", ) -> dict[str, Any]: """List all properties for a specific stream in a source connector.""" - source: Source = get_source( - source_connector_name, - docker_image=is_docker_installed() or False, + source: Source = _get_mcp_source( + connector_name=source_connector_name, + override_execution_mode=override_execution_mode, ) config_dict = resolve_config( config=config, @@ -203,12 +253,16 @@ def read_source_stream_records( int, Field(description="The maximum number of records to read."), ] = 1000, + override_execution_mode: Annotated[ + Literal["docker", "python", "yaml", "auto"], + Field(description="Optionally override the execution method to use for the connector."), + ] = "auto", ) -> list[dict[str, Any]] | str: """Get records from a source connector.""" try: - source = get_source( - source_connector_name, - docker_image=is_docker_installed() or False, + source: Source = _get_mcp_source( + connector_name=source_connector_name, + override_execution_mode=override_execution_mode, ) config_dict = resolve_config( config=config, @@ -259,6 +313,10 @@ def get_stream_previews( int, Field(description="The maximum number of sample records to return per stream."), ] = 10, + override_execution_mode: Annotated[ + Literal["docker", "python", "yaml", "auto"], + Field(description="Optionally override the execution method to use for the connector."), + ] = "auto", ) -> dict[str, list[dict[str, Any]] | str]: """Get sample records (previews) from streams in a source connector. @@ -266,9 +324,9 @@ def get_stream_previews( Returns a dictionary mapping stream names to lists of sample records, or an error message string if an error occurred for that stream. """ - source: Source = get_source( - source_name, - docker_image=is_docker_installed() or False, + source: Source = _get_mcp_source( + connector_name=source_name, + override_execution_mode=override_execution_mode, ) config_dict = resolve_config( config=config, @@ -328,11 +386,15 @@ def sync_source_to_cache( list[str] | str, Field(description="The streams to sync."), ] = "suggested", + override_execution_mode: Annotated[ + Literal["docker", "python", "yaml", "auto"], + Field(description="Optionally override the execution method to use for the connector."), + ] = "auto", ) -> str: """Run a sync from a source connector to the default DuckDB cache.""" - source = get_source( - source_connector_name, - docker_image=is_docker_installed() or False, + source: Source = _get_mcp_source( + connector_name=source_connector_name, + override_execution_mode=override_execution_mode, ) config_dict = resolve_config( config=config, From b5aa96965a18ff5eaa1880d19cb4ecb7f48eb818 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 26 Aug 2025 13:58:22 -0700 Subject: [PATCH 2/5] fix lint issue --- airbyte/mcp/_local_ops.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index 5446ad05..e04fca38 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -22,7 +22,6 @@ if TYPE_CHECKING: from airbyte.caches.duckdb import DuckDBCache - from airbyte.sources.base import Source CONFIG_HELP = """ From a42cebe555faa7f7be5b61ad5051ca2e3045b97c Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 26 Aug 2025 14:01:52 -0700 Subject: [PATCH 3/5] fix mypy --- airbyte/mcp/_local_ops.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index e04fca38..27417c84 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -52,14 +52,15 @@ def _get_mcp_source( if override_execution_mode == "auto" and is_docker_installed(): override_execution_mode = "docker" + source: Source if override_execution_mode == "python": - source: Source = get_source( + source = get_source( connector_name, use_python=True, install_if_missing=False, ) elif override_execution_mode == "docker": - source: Source = get_source( + source = get_source( connector_name, docker_image=True, install_if_missing=False, From fc0b8625597b166c133e05c500a1256272dbe103 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Tue, 26 Aug 2025 14:05:55 -0700 Subject: [PATCH 4/5] use ensure_installation() --- airbyte/mcp/_local_ops.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index 27417c84..96815311 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -77,8 +77,8 @@ def _get_mcp_source( "Expected one of: ['auto', 'docker', 'python', 'yaml']." ) - # Install if needed: - source.install() + # Ensure installed: + source.executor.ensure_installation() return source From 117ea1f15246307de1af000a2f5c6cbdb664e4ed Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Wed, 27 Aug 2025 19:04:46 -0700 Subject: [PATCH 5/5] fix unhandled auto scenario --- airbyte/mcp/_local_ops.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index 96815311..e980295b 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -53,7 +53,13 @@ def _get_mcp_source( override_execution_mode = "docker" source: Source - if override_execution_mode == "python": + if override_execution_mode == "auto": + # Use defaults with no overrides + source = get_source( + connector_name, + install_if_missing=False, + ) + elif override_execution_mode == "python": source = get_source( connector_name, use_python=True,