From b2b7102df426a4370fe2b7b8ead83c3c8802303d Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 24 Sep 2025 16:44:49 +0000 Subject: [PATCH 1/7] feat(mcp): Add manifest_path parameter to local operations tools - Add manifest_path parameter to all local MCP operations tools - Enable testing of custom YAML connector manifests - Bridge PyAirbyte MCP with connector-builder-mcp functionality - Maintain backward compatibility with existing tools - Support declarative connectors with local manifest files Co-Authored-By: AJ Steers --- airbyte/mcp/_local_ops.py | 70 ++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index f9dbe40b..a451f0c1 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -42,6 +42,11 @@ If both `config` and `config_secret_name` are provided, the `config` will be loaded first and then the referenced secret config will be layered on top of the non-secret config. + +For declarative connectors, you can provide a `manifest_path` to +specify a local YAML manifest file instead of using the registry +version. This is useful for testing custom or locally-developed +connector manifests. """ @@ -50,6 +55,7 @@ def _get_mcp_source( override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", *, install_if_missing: bool = True, + manifest_path: str | Path | None = None, ) -> Source: """Get the MCP source for a connector.""" if override_execution_mode == "auto" and is_docker_installed(): @@ -61,23 +67,26 @@ def _get_mcp_source( source = get_source( connector_name, install_if_missing=False, + source_manifest=manifest_path or None, ) elif override_execution_mode == "python": source = get_source( connector_name, use_python=True, install_if_missing=False, + source_manifest=manifest_path or None, ) elif override_execution_mode == "docker": source = get_source( connector_name, docker_image=True, install_if_missing=False, + source_manifest=manifest_path or None, ) elif override_execution_mode == "yaml": source = get_source( connector_name, - source_manifest=True, + source_manifest=manifest_path or True, install_if_missing=False, ) else: @@ -112,21 +121,28 @@ def validate_connector_config( description="Path to a YAML or JSON file containing the connector configuration.", default=None, ), - ], + ] = None, config_secret_name: Annotated[ str | None, Field( description="The name of the secret containing the configuration.", default=None, ), - ], + ] = None, override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( description="Optionally override the execution method to use for the connector.", default="auto", ), - ], + ] = "auto", + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ] = None, ) -> tuple[bool, str]: """Validate a connector configuration. @@ -136,6 +152,7 @@ def validate_connector_config( source: Source = _get_mcp_source( connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) except Exception as ex: return False, f"Failed to get connector '{connector_name}': {ex}" @@ -221,6 +238,10 @@ def list_source_streams( Literal["docker", "python", "yaml", "auto"], Field(description="Optionally override the execution method to use for the connector."), ] = "auto", + manifest_path: Annotated[ + str | Path | None, + Field(description="Path to a local YAML manifest file for declarative connectors."), + ] = None, ) -> list[str]: """List all streams available in a source connector. @@ -229,6 +250,7 @@ def list_source_streams( source: Source = _get_mcp_source( connector_name=source_connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( config=config, @@ -277,12 +299,20 @@ def get_source_stream_json_schema( description="Optionally override the execution method to use for the connector.", default="auto", ), - ], + ] = "auto", + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ] = None, ) -> dict[str, Any]: """List all properties for a specific stream in a source connector.""" source: Source = _get_mcp_source( connector_name=source_connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( config=config, @@ -339,13 +369,21 @@ def read_source_stream_records( description="Optionally override the execution method to use for the connector.", default="auto", ), - ], + ] = "auto", + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ] = None, ) -> list[dict[str, Any]] | str: """Get records from a source connector.""" try: source: Source = _get_mcp_source( connector_name=source_connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( config=config, @@ -422,7 +460,14 @@ def get_stream_previews( description="Optionally override the execution method to use for the connector.", default="auto", ), - ], + ] = "auto", + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ] = None, ) -> dict[str, list[dict[str, Any]] | str]: """Get sample records (previews) from streams in a source connector. @@ -433,6 +478,7 @@ def get_stream_previews( source: Source = _get_mcp_source( connector_name=source_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( @@ -510,12 +556,20 @@ def sync_source_to_cache( description="Optionally override the execution method to use for the connector.", default="auto", ), - ], + ] = "auto", + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ] = None, ) -> str: """Run a sync from a source connector to the default DuckDB cache.""" source: Source = _get_mcp_source( connector_name=source_connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( config=config, From aef665a5acb3862e811967c7bc046053c014b5b5 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Wed, 24 Sep 2025 17:30:54 +0000 Subject: [PATCH 2/7] feat(mcp): Force yaml mode when manifest_path provided and remove Python defaults - Force execution mode to 'yaml' when manifest_path is provided - Remove Python-level default arguments from function signatures - Keep Field() defaults intact for MCP tool compatibility - Update parameter documentation to clarify override behavior Co-Authored-By: AJ Steers --- airbyte/mcp/_local_ops.py | 62 +++++++++++++++++++++++---------------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index a451f0c1..a07dac57 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -52,13 +52,15 @@ def _get_mcp_source( connector_name: str, - override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", + override_execution_mode: Literal["auto", "docker", "python", "yaml"], *, install_if_missing: bool = True, - manifest_path: str | Path | None = None, + manifest_path: str | Path | None, ) -> Source: """Get the MCP source for a connector.""" - if override_execution_mode == "auto" and is_docker_installed(): + if manifest_path: + override_execution_mode = "yaml" + elif override_execution_mode == "auto" and is_docker_installed(): override_execution_mode = "docker" source: Source @@ -121,28 +123,29 @@ def validate_connector_config( description="Path to a YAML or JSON file containing the connector configuration.", default=None, ), - ] = None, + ], config_secret_name: Annotated[ str | None, Field( description="The name of the secret containing the configuration.", default=None, ), - ] = None, + ], override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), - ] = "auto", + ], manifest_path: Annotated[ str | Path | None, Field( description="Path to a local YAML manifest file for declarative connectors.", default=None, ), - ] = None, + ], ) -> tuple[bool, str]: """Validate a connector configuration. @@ -225,23 +228,26 @@ def list_source_streams( config: Annotated[ dict | str | None, Field(description="The configuration for the source connector as a dict or JSON string."), - ] = None, + ], config_file: Annotated[ str | Path | None, Field(description="Path to a YAML or JSON file containing the source connector config."), - ] = None, + ], config_secret_name: Annotated[ str | None, Field(description="The name of the secret containing the configuration."), - ] = None, + ], override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], - Field(description="Optionally override the execution method to use for the connector."), - ] = "auto", + Field( + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used)." + ), + ], manifest_path: Annotated[ str | Path | None, Field(description="Path to a local YAML manifest file for declarative connectors."), - ] = None, + ], ) -> list[str]: """List all streams available in a source connector. @@ -296,17 +302,18 @@ def get_source_stream_json_schema( override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), - ] = "auto", + ], manifest_path: Annotated[ str | Path | None, Field( description="Path to a local YAML manifest file for declarative connectors.", default=None, ), - ] = None, + ], ) -> dict[str, Any]: """List all properties for a specific stream in a source connector.""" source: Source = _get_mcp_source( @@ -366,17 +373,18 @@ def read_source_stream_records( override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), - ] = "auto", + ], manifest_path: Annotated[ str | Path | None, Field( description="Path to a local YAML manifest file for declarative connectors.", default=None, ), - ] = None, + ], ) -> list[dict[str, Any]] | str: """Get records from a source connector.""" try: @@ -457,17 +465,18 @@ def get_stream_previews( override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), - ] = "auto", + ], manifest_path: Annotated[ str | Path | None, Field( description="Path to a local YAML manifest file for declarative connectors.", default=None, ), - ] = None, + ], ) -> dict[str, list[dict[str, Any]] | str]: """Get sample records (previews) from streams in a source connector. @@ -553,17 +562,18 @@ def sync_source_to_cache( override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), - ] = "auto", + ], manifest_path: Annotated[ str | Path | None, Field( description="Path to a local YAML manifest file for declarative connectors.", default=None, ), - ] = None, + ], ) -> str: """Run a sync from a source connector to the default DuckDB cache.""" source: Source = _get_mcp_source( From 2bfbfd7578731ad18bfe28a090eb02c849b66361 Mon Sep 17 00:00:00 2001 From: Aaron Steers Date: Wed, 24 Sep 2025 13:19:29 -0700 Subject: [PATCH 3/7] convert path-like str to path --- airbyte/_executors/util.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index b278ce07..90e411de 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -311,6 +311,14 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # ) if source_manifest: + if ( + isinstance(source_manifest, str) + and len(source_manifest.splitlines()) == 1 + and not source_manifest.startswith(("http://", "https://")) + ): + # If source_manifest is a single line string and not a URL, assume it's a file path + source_manifest = Path(source_manifest) + if isinstance(source_manifest, dict | Path): components_py_path: Path | None = None if isinstance(source_manifest, Path): From 088fdbc821cda548f7da84e488b596064b7832d8 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 24 Sep 2025 13:32:33 -0700 Subject: [PATCH 4/7] Update airbyte/_executors/util.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- airbyte/_executors/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index 90e411de..9749acd7 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -317,7 +317,7 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # and not source_manifest.startswith(("http://", "https://")) ): # If source_manifest is a single line string and not a URL, assume it's a file path - source_manifest = Path(source_manifest) + source_manifest = Path(source_manifest).expanduser() if isinstance(source_manifest, dict | Path): components_py_path: Path | None = None From 3983bf0329c053373f646a417ca1a529daaeab43 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 24 Sep 2025 13:32:59 -0700 Subject: [PATCH 5/7] Update airbyte/mcp/_local_ops.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- airbyte/mcp/_local_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index a07dac57..c4e5d8d0 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -52,7 +52,7 @@ def _get_mcp_source( connector_name: str, - override_execution_mode: Literal["auto", "docker", "python", "yaml"], + override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", *, install_if_missing: bool = True, manifest_path: str | Path | None, From fd310e70341359de642a4dde4c02bb26857ecf14 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 24 Sep 2025 14:44:08 -0700 Subject: [PATCH 6/7] fix tool func signatures --- airbyte/mcp/_local_ops.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index c4e5d8d0..89f8bcc2 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -227,26 +227,39 @@ def list_source_streams( ], config: Annotated[ dict | str | None, - Field(description="The configuration for the source connector as a dict or JSON string."), + Field( + description="The configuration for the source connector as a dict or JSON string.", + default=None, + ), ], config_file: Annotated[ str | Path | None, - Field(description="Path to a YAML or JSON file containing the source connector config."), + Field( + description="Path to a YAML or JSON file containing the source connector config.", + default=None, + ), ], config_secret_name: Annotated[ str | None, - Field(description="The name of the secret containing the configuration."), - ], + Field( + description="The name of the secret containing the configuration.", + default=None, + ), + ] = None, override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( description="Optionally override the execution method to use for the connector. " - "This parameter is ignored if manifest_path is provided (yaml mode will be used)." + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", + default="auto", ), ], manifest_path: Annotated[ str | Path | None, - Field(description="Path to a local YAML manifest file for declarative connectors."), + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), ], ) -> list[str]: """List all streams available in a source connector. From 9b03efe4a76f1036f461e0cdc0725c4f7db06fea Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Wed, 24 Sep 2025 14:46:50 -0700 Subject: [PATCH 7/7] fix sig --- airbyte/mcp/_local_ops.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index 89f8bcc2..0e4748fb 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -245,7 +245,7 @@ def list_source_streams( description="The name of the secret containing the configuration.", default=None, ), - ] = None, + ], override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field(