diff --git a/airbyte/_executors/util.py b/airbyte/_executors/util.py index b278ce07..9749acd7 100644 --- a/airbyte/_executors/util.py +++ b/airbyte/_executors/util.py @@ -311,6 +311,14 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 # ) if source_manifest: + if ( + isinstance(source_manifest, str) + and len(source_manifest.splitlines()) == 1 + and not source_manifest.startswith(("http://", "https://")) + ): + # If source_manifest is a single line string and not a URL, assume it's a file path + source_manifest = Path(source_manifest).expanduser() + if isinstance(source_manifest, dict | Path): components_py_path: Path | None = None if isinstance(source_manifest, Path): diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index f9dbe40b..0e4748fb 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -42,6 +42,11 @@ If both `config` and `config_secret_name` are provided, the `config` will be loaded first and then the referenced secret config will be layered on top of the non-secret config. + +For declarative connectors, you can provide a `manifest_path` to +specify a local YAML manifest file instead of using the registry +version. This is useful for testing custom or locally-developed +connector manifests. """ @@ -50,9 +55,12 @@ def _get_mcp_source( override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto", *, install_if_missing: bool = True, + manifest_path: str | Path | None, ) -> Source: """Get the MCP source for a connector.""" - if override_execution_mode == "auto" and is_docker_installed(): + if manifest_path: + override_execution_mode = "yaml" + elif override_execution_mode == "auto" and is_docker_installed(): override_execution_mode = "docker" source: Source @@ -61,23 +69,26 @@ def _get_mcp_source( source = get_source( connector_name, install_if_missing=False, + source_manifest=manifest_path or None, ) elif override_execution_mode == "python": source = get_source( connector_name, use_python=True, install_if_missing=False, + source_manifest=manifest_path or None, ) elif override_execution_mode == "docker": source = get_source( connector_name, docker_image=True, install_if_missing=False, + source_manifest=manifest_path or None, ) elif override_execution_mode == "yaml": source = get_source( connector_name, - source_manifest=True, + source_manifest=manifest_path or True, install_if_missing=False, ) else: @@ -123,10 +134,18 @@ def validate_connector_config( override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), ], + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ], ) -> tuple[bool, str]: """Validate a connector configuration. @@ -136,6 +155,7 @@ def validate_connector_config( source: Source = _get_mcp_source( connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) except Exception as ex: return False, f"Failed to get connector '{connector_name}': {ex}" @@ -207,20 +227,40 @@ def list_source_streams( ], config: Annotated[ dict | str | None, - Field(description="The configuration for the source connector as a dict or JSON string."), - ] = None, + Field( + description="The configuration for the source connector as a dict or JSON string.", + default=None, + ), + ], config_file: Annotated[ str | Path | None, - Field(description="Path to a YAML or JSON file containing the source connector config."), - ] = None, + Field( + description="Path to a YAML or JSON file containing the source connector config.", + default=None, + ), + ], config_secret_name: Annotated[ str | None, - Field(description="The name of the secret containing the configuration."), - ] = None, + Field( + description="The name of the secret containing the configuration.", + default=None, + ), + ], override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], - Field(description="Optionally override the execution method to use for the connector."), - ] = "auto", + Field( + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", + default="auto", + ), + ], + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ], ) -> list[str]: """List all streams available in a source connector. @@ -229,6 +269,7 @@ def list_source_streams( source: Source = _get_mcp_source( connector_name=source_connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( config=config, @@ -274,15 +315,24 @@ def get_source_stream_json_schema( override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), ], + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ], ) -> dict[str, Any]: """List all properties for a specific stream in a source connector.""" source: Source = _get_mcp_source( connector_name=source_connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( config=config, @@ -336,16 +386,25 @@ def read_source_stream_records( override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), ], + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ], ) -> list[dict[str, Any]] | str: """Get records from a source connector.""" try: source: Source = _get_mcp_source( connector_name=source_connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( config=config, @@ -419,10 +478,18 @@ def get_stream_previews( override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), ], + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ], ) -> dict[str, list[dict[str, Any]] | str]: """Get sample records (previews) from streams in a source connector. @@ -433,6 +500,7 @@ def get_stream_previews( source: Source = _get_mcp_source( connector_name=source_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( @@ -507,15 +575,24 @@ def sync_source_to_cache( override_execution_mode: Annotated[ Literal["docker", "python", "yaml", "auto"], Field( - description="Optionally override the execution method to use for the connector.", + description="Optionally override the execution method to use for the connector. " + "This parameter is ignored if manifest_path is provided (yaml mode will be used).", default="auto", ), ], + manifest_path: Annotated[ + str | Path | None, + Field( + description="Path to a local YAML manifest file for declarative connectors.", + default=None, + ), + ], ) -> str: """Run a sync from a source connector to the default DuckDB cache.""" source: Source = _get_mcp_source( connector_name=source_connector_name, override_execution_mode=override_execution_mode, + manifest_path=manifest_path, ) config_dict = resolve_config( config=config,