Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions airbyte/mcp/_cloud_ops.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Airbyte Cloud MCP operations."""

from typing import Annotated

from fastmcp import FastMCP
Expand All @@ -18,7 +17,7 @@
from airbyte.cloud.connectors import CloudDestination, CloudSource
from airbyte.cloud.workspaces import CloudWorkspace
from airbyte.destinations.util import get_noop_destination
from airbyte.mcp._util import resolve_config
from airbyte.mcp._util import resolve_config, resolve_list_of_strings


def _get_cloud_workspace() -> CloudWorkspace:
Expand Down Expand Up @@ -62,7 +61,10 @@ def deploy_source_to_cloud(
Airbyte Cloud API.
"""
try:
source = get_source(source_connector_name)
source = get_source(
source_connector_name,
install_if_missing=False,
)
config_dict = resolve_config(
config=config,
config_secret_name=config_secret_name,
Expand Down Expand Up @@ -117,7 +119,10 @@ def deploy_destination_to_cloud(
Airbyte Cloud API.
"""
try:
destination = get_destination(destination_connector_name)
destination = get_destination(
destination_connector_name,
install_if_missing=False,
)
config_dict = resolve_config(
config=config,
config_secret_name=config_secret_name,
Expand Down Expand Up @@ -156,8 +161,14 @@ def create_connection_on_cloud(
Field(description="The ID of the deployed destination."),
],
selected_streams: Annotated[
list[str],
Field(description="The selected stream names to sync within the connection."),
str | list[str],
Field(
description=(
"The selected stream names to sync within the connection. "
"Must be an explicit stream name or list of streams. "
"Cannot be empty or '*'."
)
),
],
table_prefix: Annotated[
str | None,
Expand All @@ -170,13 +181,14 @@ def create_connection_on_cloud(
and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the
Airbyte Cloud API.
"""
resolved_streams_list: list[str] = resolve_list_of_strings(selected_streams)
try:
workspace: CloudWorkspace = _get_cloud_workspace()
deployed_connection = workspace.deploy_connection(
connection_name=connection_name,
source=source_id,
destination=destination_id,
selected_streams=selected_streams,
selected_streams=resolved_streams_list,
table_prefix=table_prefix,
)

Expand Down
4 changes: 3 additions & 1 deletion airbyte/mcp/_connector_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ def list_connectors(
Field(description="Filter connectors by type ('source' or 'destination')."),
] = None,
install_types: Annotated[
set[Literal["java", "python", "yaml", "docker"]] | None,
Literal["java", "python", "yaml", "docker"]
| set[Literal["java", "python", "yaml", "docker"]]
| None,
Field(
description="""
Filter connectors by install type.
Expand Down
35 changes: 18 additions & 17 deletions airbyte/mcp/_local_ops.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Local MCP operations."""

import sys
import traceback
from itertools import islice
Expand All @@ -13,7 +12,7 @@
from airbyte import get_source
from airbyte._util.meta import is_docker_installed
from airbyte.caches.util import get_default_cache
from airbyte.mcp._util import resolve_config
from airbyte.mcp._util import resolve_config, resolve_list_of_strings
from airbyte.secrets.config import _get_secret_sources
from airbyte.secrets.google_gsm import GoogleGSMSecretManager
from airbyte.sources.base import Source
Expand Down Expand Up @@ -47,6 +46,8 @@
def _get_mcp_source(
connector_name: str,
override_execution_mode: Literal["auto", "docker", "python", "yaml"] = "auto",
*,
install_if_missing: bool = True,
) -> Source:
"""Get the MCP source for a connector."""
if override_execution_mode == "auto" and is_docker_installed():
Expand Down Expand Up @@ -84,7 +85,9 @@ def _get_mcp_source(
)

# Ensure installed:
source.executor.ensure_installation()
if install_if_missing:
source.executor.ensure_installation()

return source


Expand Down Expand Up @@ -158,12 +161,10 @@ def list_connector_config_secrets(
secrets_names: list[str] = []
for secrets_mgr in _get_secret_sources():
if isinstance(secrets_mgr, GoogleGSMSecretManager):
secrets_names.extend(
[
secret_handle.secret_name.split("/")[-1]
for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
]
)
secrets_names.extend([
secret_handle.secret_name.split("/")[-1]
for secret_handle in secrets_mgr.fetch_connector_secrets(connector_name)
])

return secrets_names

Expand Down Expand Up @@ -358,6 +359,7 @@ def get_stream_previews(
connector_name=source_name,
override_execution_mode=override_execution_mode,
)

config_dict = resolve_config(
config=config,
config_file=config_file,
Expand All @@ -366,15 +368,9 @@ def get_stream_previews(
)
source.set_config(config_dict)

streams_param: list[str] | Literal["*"] | None
if streams == "*":
streams_param: list[str] | Literal["*"] | None = resolve_list_of_strings(streams)
if streams_param and len(streams_param) == 1 and streams_param[0] == "*":
streams_param = "*"
elif isinstance(streams, str) and streams != "*":
streams_param = [streams]
elif isinstance(streams, list):
streams_param = streams
else:
streams_param = None

try:
samples_result = source.get_samples(
Expand Down Expand Up @@ -440,6 +436,11 @@ def sync_source_to_cache(
source.set_config(config_dict)
cache = get_default_cache()

streams = resolve_list_of_strings(streams)
if streams and len(streams) == 1 and streams[0] in {"*", "suggested"}:
# Float '*' and 'suggested' to the top-level for special processing:
streams = streams[0]

if isinstance(streams, str) and streams == "suggested":
streams = "*" # Default to all streams if 'suggested' is not otherwise specified.
try:
Expand Down
58 changes: 57 additions & 1 deletion airbyte/mcp/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import os
from pathlib import Path
from typing import Any
from typing import Any, overload

import dotenv
import yaml
Expand Down Expand Up @@ -70,6 +70,62 @@ def initialize_secrets() -> None:
disable_secret_source(SecretSourceEnum.PROMPT)


# Hint: Null result if input is Null
@overload
def resolve_list_of_strings(value: None) -> None: ...


# Hint: Non-null result if input is non-null
@overload
def resolve_list_of_strings(value: str | list[str] | set[str]) -> list[str]: ...


def resolve_list_of_strings(value: str | list[str] | set[str] | None) -> list[str] | None:
"""Resolve a string or list of strings to a list of strings.

This method will handle three types of input:

1. A list of strings (e.g., ["stream1", "stream2"]) will be returned as-is.
2. None or empty input will return None.
3. A single CSV string (e.g., "stream1,stream2") will be split into a list.
4. A JSON string (e.g., '["stream1", "stream2"]') will be parsed into a list.
5. If the input is empty or None, an empty list will be returned.

Args:
value: A string or list of strings.
"""
if value is None:
return None

if isinstance(value, list):
return value

if isinstance(value, set):
return list(value)

if not isinstance(value, str):
raise TypeError(
"Expected a string, list of strings, a set of strings, or None. "
f"Got '{type(value).__name__}': {value}"
)

value = value.strip()
if not value:
return []

if value.startswith("[") and value.endswith("]"):
# Try to parse as JSON array:
try:
parsed = json.loads(value)
if isinstance(parsed, list) and all(isinstance(item, str) for item in parsed):
return parsed
except json.JSONDecodeError as ex:
raise ValueError(f"Invalid JSON array: {value}") from ex

# Fallback to CSV split:
return [item.strip() for item in value.split(",") if item.strip()]


def resolve_config( # noqa: PLR0912
config: dict | str | None = None,
config_file: str | Path | None = None,
Expand Down
Loading