Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions airbyte/_executors/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,14 @@ def get_connector_executor( # noqa: PLR0912, PLR0913, PLR0914, PLR0915, C901 #
)

if source_manifest:
if (
isinstance(source_manifest, str)
and len(source_manifest.splitlines()) == 1
and not source_manifest.startswith(("http://", "https://"))
):
# If source_manifest is a single line string and not a URL, assume it's a file path
source_manifest = Path(source_manifest)

if isinstance(source_manifest, dict | Path):
components_py_path: Path | None = None
if isinstance(source_manifest, Path):
Expand Down
90 changes: 77 additions & 13 deletions airbyte/mcp/_local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,25 @@
If both `config` and `config_secret_name` are provided, the
`config` will be loaded first and then the referenced secret config
will be layered on top of the non-secret config.

For declarative connectors, you can provide a `manifest_path` to
specify a local YAML manifest file instead of using the registry
version. This is useful for testing custom or locally-developed
connector manifests.
"""


def _get_mcp_source(
connector_name: str,
override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
override_execution_mode: Literal["auto", "docker", "python", "yaml"],
*,
install_if_missing: bool = True,
manifest_path: str | Path | None,
) -> Source:
"""Get the MCP source for a connector."""
if override_execution_mode == "auto" and is_docker_installed():
if manifest_path:
override_execution_mode = "yaml"
elif override_execution_mode == "auto" and is_docker_installed():
override_execution_mode = "docker"

source: Source
Expand All @@ -61,23 +69,26 @@ def _get_mcp_source(
source = get_source(
connector_name,
install_if_missing=False,
source_manifest=manifest_path or None,
)
elif override_execution_mode == "python":
source = get_source(
connector_name,
use_python=True,
install_if_missing=False,
source_manifest=manifest_path or None,
)
elif override_execution_mode == "docker":
source = get_source(
connector_name,
docker_image=True,
install_if_missing=False,
source_manifest=manifest_path or None,
)
elif override_execution_mode == "yaml":
source = get_source(
connector_name,
source_manifest=True,
source_manifest=manifest_path or True,
install_if_missing=False,
)
else:
Expand Down Expand Up @@ -123,10 +134,18 @@ def validate_connector_config(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> tuple[bool, str]:
"""Validate a connector configuration.

Expand All @@ -136,6 +155,7 @@ def validate_connector_config(
source: Source = _get_mcp_source(
connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
except Exception as ex:
return False, f"Failed to get connector '{connector_name}': {ex}"
Expand Down Expand Up @@ -208,19 +228,26 @@ def list_source_streams(
config: Annotated[
dict | str | None,
Field(description="The configuration for the source connector as a dict or JSON string."),
] = None,
],
config_file: Annotated[
str | Path | None,
Field(description="Path to a YAML or JSON file containing the source connector config."),
] = None,
],
config_secret_name: Annotated[
str | None,
Field(description="The name of the secret containing the configuration."),
] = None,
],
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(description="Optionally override the execution method to use for the connector."),
] = "auto",
Field(
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used)."
),
],
manifest_path: Annotated[
str | Path | None,
Field(description="Path to a local YAML manifest file for declarative connectors."),
],
) -> list[str]:
"""List all streams available in a source connector.

Expand All @@ -229,6 +256,7 @@ def list_source_streams(
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -274,15 +302,24 @@ def get_source_stream_json_schema(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> dict[str, Any]:
"""List all properties for a specific stream in a source connector."""
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -336,16 +373,25 @@ def read_source_stream_records(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> list[dict[str, Any]] | str:
"""Get records from a source connector."""
try:
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
config_dict = resolve_config(
config=config,
Expand Down Expand Up @@ -419,10 +465,18 @@ def get_stream_previews(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> dict[str, list[dict[str, Any]] | str]:
"""Get sample records (previews) from streams in a source connector.

Expand All @@ -433,6 +487,7 @@ def get_stream_previews(
source: Source = _get_mcp_source(
connector_name=source_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)

config_dict = resolve_config(
Expand Down Expand Up @@ -507,15 +562,24 @@ def sync_source_to_cache(
override_execution_mode: Annotated[
Literal["docker", "python", "yaml", "auto"],
Field(
description="Optionally override the execution method to use for the connector.",
description="Optionally override the execution method to use for the connector. "
"This parameter is ignored if manifest_path is provided (yaml mode will be used).",
default="auto",
),
],
manifest_path: Annotated[
str | Path | None,
Field(
description="Path to a local YAML manifest file for declarative connectors.",
default=None,
),
],
) -> str:
"""Run a sync from a source connector to the default DuckDB cache."""
source: Source = _get_mcp_source(
connector_name=source_connector_name,
override_execution_mode=override_execution_mode,
manifest_path=manifest_path,
)
config_dict = resolve_config(
config=config,
Expand Down
Loading