Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,14 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 #
)

if source_manifest:
if (
isinstance(source_manifest, str)
and len(source_manifest.splitlines()) == 1
and not source_manifest.startswith(("http://", "https://"))
):
# If source_manifest is a single line string and not a URL, assume it's a file path
source_manifest = Path(source_manifest).expanduser()

if isinstance(source_manifest, dict | Path):
components_py_path: Path | None = None
if isinstance(source_manifest, Path):
Expand Down
107 changes: 92 additions & 15 deletions airbyte/mcp/_local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
If both `config` and `config_secret_name` are provided, the
`config` will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.

For declarative connectors, you can provide a `manifest_path` to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
"""


Expand All @@ -50,9 +55,12 @@ def _get_mcp_source(
override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
*,
install_if_missing: bool = True,
manifest_path: str | Path | None,
) -> Source:
"""Get the MCP source for a connector."""
if override_execution_mode == "auto" and is_docker_installed():
if manifest_path:
override_execution_mode = "yaml"
elif override_execution_mode == "auto" and is_docker_installed():
override_execution_mode = "docker"

source: Source
Expand All @@ -61,23 +69,26 @@ def _get_mcp_source(
source = get_source(
connector_name,
install_if_missing=False,
source_manifest=manifest_path or None,
)
elif override_execution_mode == "python":
source = get_source(
connector_name,
use_python=True,
install_if_missing=False,
source_manifest=manifest_path or None,
)
elif override_execution_mode == "docker":
source = get_source(
connector_name,
docker_image=True,
install_if_missing=False,
source_manifest=manifest_path or None,
)
elif override_execution_mode == "yaml":
source = get_source(
connector_name,
source_manifest=True,
source_manifest=manifest_path or True,
install_if_missing=False,
)
else:
Expand Down Expand Up @@ -123,10 +134,18 @@ def validate_connector_config(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> tuple[bool, str]:
"""Validate a connector configuration.

Expand All @@ -136,6 +155,7 @@ def validate_connector_config(
source: Source = _get_mcp_source(
connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
except Exception as ex:
return False, f"Failed to get connector '{connector_name}': {ex}"
Expand Down Expand Up @@ -207,20 +227,40 @@ def list_source_streams(
],
config: Annotated[
dict | str | None,
Field(description="The configuration for the source connector as a dict or JSON string."),
] = None,
Field(
description="The configuration for the source connector as a dict or JSON string.",
default=None,
),
],
config_file: Annotated[
str | Path | None,
Field(description="Path to a YAML or JSON file containing the source connector config."),
] = None,
Field(
description="Path to a YAML or JSON file containing the source connector config.",
default=None,
),
],
config_secret_name: Annotated[
str | None,
Field(description="The name of the secret containing the configuration."),
] = None,
Field(
description="The name of the secret containing the configuration.",
default=None,
),
],
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(description="Optionally override the execution method to use for the connector."),
] = "auto",
Field(
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> list[str]:
"""List all streams available in a source connector.

Expand All @@ -229,6 +269,7 @@ def list_source_streams(
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -274,15 +315,24 @@ def get_source_stream_json_schema(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> dict[str, Any]:
"""List all properties for a specific stream in a source connector."""
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -336,16 +386,25 @@ def read_source_stream_records(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> list[dict[str, Any]] | str:
"""Get records from a source connector."""
try:
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -419,10 +478,18 @@ def get_stream_previews(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> dict[str, list[dict[str, Any]] | str]:
"""Get sample records (previews) from streams in a source connector.

Expand All @@ -433,6 +500,7 @@ def get_stream_previews(
source: Source = _get_mcp_source(
connector_name=source_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)

config_dict = resolve_config(
Expand Down Expand Up @@ -507,15 +575,24 @@ def sync_source_to_cache(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> str:
"""Run a sync from a source connector to the default DuckDB cache."""
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
config_dict = resolve_config(
config=config,
Expand Down
Loading