Skip to content
58 changes: 44 additions & 14 deletions airbyte/mcp/_local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ def validate_connector_config(
Field(description="The name of the connector to validate."),
],
config: Annotated[
dict | Path | None,
Field(description="The configuration for the connector."),
dict | str | None,
Field(description="The configuration for the connector as a dict object or JSON string."),
] = None,
config_file: Annotated[
str | Path | None,
Field(description="Path to a YAML or JSON file containing the connector configuration."),
] = None,
config_secret_name: Annotated[
str | None,
Expand All @@ -74,6 +78,7 @@ def validate_connector_config(
try:
config_dict = resolve_config(
config=config,
config_file=config_file,
config_secret_name=config_secret_name,
config_spec_jsonschema=source.config_spec,
)
Expand Down Expand Up @@ -122,8 +127,12 @@ def list_source_streams(
Field(description="The name of the source connector."),
],
config: Annotated[
dict | Path | None,
Field(description="The configuration for the source connector."),
dict | str | None,
Field(description="The configuration for the source connector as a dict or JSON string."),
] = None,
config_file: Annotated[
str | Path | None,
Field(description="Path to a YAML or JSON file containing the source connector config."),
] = None,
config_secret_name: Annotated[
str | None,
Expand All @@ -140,6 +149,7 @@ def list_source_streams(
)
config_dict = resolve_config(
config=config,
config_file=config_file,
config_secret_name=config_secret_name,
config_spec_jsonschema=source.config_spec,
)
Expand All @@ -158,13 +168,17 @@ def get_source_stream_json_schema(
Field(description="The name of the stream."),
],
config: Annotated[
dict | Path | None,
Field(description="The configuration for the source connector."),
],
dict | str | None,
Field(description="The configuration for the source connector as a dict or JSON string."),
] = None,
config_file: Annotated[
str | Path | None,
Field(description="Path to a YAML or JSON file containing the source connector config."),
] = None,
config_secret_name: Annotated[
str | None,
Field(description="The name of the secret containing the configuration."),
],
] = None,
) -> dict[str, Any]:
"""List all properties for a specific stream in a source connector."""
source: Source = get_source(
Expand All @@ -173,6 +187,7 @@ def get_source_stream_json_schema(
)
config_dict = resolve_config(
config=config,
config_file=config_file,
config_secret_name=config_secret_name,
config_spec_jsonschema=source.config_spec,
)
Expand All @@ -187,8 +202,12 @@ def read_source_stream_records(
Field(description="The name of the source connector."),
],
config: Annotated[
dict | Path | None,
Field(description="The configuration for the source connector."),
dict | str | None,
Field(description="The configuration for the source connector as a dict or JSON string."),
] = None,
config_file: Annotated[
str | Path | None,
Field(description="Path to a YAML or JSON file containing the source connector config."),
] = None,
config_secret_name: Annotated[
str | None,
Expand All @@ -212,6 +231,7 @@ def read_source_stream_records(
)
config_dict = resolve_config(
config=config,
config_file=config_file,
config_secret_name=config_secret_name,
config_spec_jsonschema=source.config_spec,
)
Expand Down Expand Up @@ -241,8 +261,12 @@ def get_stream_previews(
Field(description="The name of the source connector."),
],
config: Annotated[
dict | Path | None,
Field(description="The configuration for the source connector."),
dict | str | None,
Field(description="The configuration for the source connector as a dict or JSON string."),
] = None,
config_file: Annotated[
str | Path | None,
Field(description="Path to a YAML or JSON file containing the source connector config."),
] = None,
config_secret_name: Annotated[
str | None,
Expand Down Expand Up @@ -272,6 +296,7 @@ def get_stream_previews(
)
config_dict = resolve_config(
config=config,
config_file=config_file,
config_secret_name=config_secret_name,
config_spec_jsonschema=source.config_spec,
)
Expand Down Expand Up @@ -317,8 +342,12 @@ def sync_source_to_cache(
Field(description="The name of the source connector."),
],
config: Annotated[
dict | Path | None,
Field(description="The configuration for the source connector."),
dict | str | None,
Field(description="The configuration for the source connector as a dict or JSON string."),
] = None,
config_file: Annotated[
str | Path | None,
Field(description="Path to a YAML or JSON file containing the source connector config."),
] = None,
config_secret_name: Annotated[
str | None,
Expand All @@ -336,6 +365,7 @@ def sync_source_to_cache(
)
config_dict = resolve_config(
config=config,
config_file=config_file,
config_secret_name=config_secret_name,
config_spec_jsonschema=source.config_spec,
)
Expand Down
104 changes: 71 additions & 33 deletions airbyte/mcp/_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Internal utility functions for MCP."""

import json
import os
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -43,50 +44,87 @@ def initialize_secrets() -> None:
)


def resolve_config(
config: dict | Path | None = None,
def resolve_config( # noqa: PLR0912
config: dict | str | None = None,
config_file: str | Path | None = None,
config_secret_name: str | None = None,
config_spec_jsonschema: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Resolve a configuration dictionary or file path to a dictionary.
"""Resolve a configuration dictionary, JSON string, or file path to a dictionary.

Returns:
Resolved configuration dictionary

Raises:
ValueError: If no configuration provided or if JSON parsing fails

We reject hardcoded secrets in a config dict if we detect them.
"""
config_dict: dict[str, Any] = {}
if config is None and config_secret_name is None:
raise ValueError(
"No configuration provided. Either `config` or `config_secret_name` must be specified."
)

if isinstance(config, Path):
config_dict.update(yaml.safe_load(config.read_text()))
if config is None and config_file is None and config_secret_name is None:
return {}

if config_file is not None:
if isinstance(config_file, str):
config_file = Path(config_file)

if not isinstance(config_file, Path):
raise ValueError(
f"config_file must be a string or Path object, got: {type(config_file).__name__}"
)

if not config_file.exists():
raise FileNotFoundError(f"Configuration file not found: {config_file}")

elif isinstance(config, dict):
if config_spec_jsonschema is not None:
hardcoded_secrets: list[list[str]] = detect_hardcoded_secrets(
config=config,
spec_json_schema=config_spec_jsonschema,
def _raise_invalid_type(file_config: object) -> None:
raise TypeError(
f"Configuration file must contain a valid JSON/YAML object, "
f"got: {type(file_config).__name__}"
)
if hardcoded_secrets:
error_msg = "Configuration contains hardcoded secrets in fields: "
error_msg += ", ".join(
[".".join(hardcoded_secret) for hardcoded_secret in hardcoded_secrets]
)

error_msg += (
"Please use environment variables instead. For example:\n"
"To set a secret via reference, set its value to "
"`secret_reference::ENV_VAR_NAME`.\n"
)
raise ValueError(error_msg)

config_dict.update(config)
elif config is not None:
# We shouldn't reach here.
raise ValueError(
"Config must be a dict or a Path object pointing to a YAML or JSON file. "
f"Found type: {type(config).__name__}"

try:
file_config = yaml.safe_load(config_file.read_text())
if not isinstance(file_config, dict):
_raise_invalid_type(file_config)
config_dict.update(file_config)
except Exception as e:
raise ValueError(f"Error reading configuration file {config_file}: {e}") from e

if config is not None:
if isinstance(config, dict):
config_dict.update(config)
elif isinstance(config, str):
try:
parsed_config = json.loads(config)
if not isinstance(parsed_config, dict):
raise TypeError(
f"Parsed JSON config must be an object/dict, "
f"got: {type(parsed_config).__name__}"
)
config_dict.update(parsed_config)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON in config parameter: {e}") from e
else:
raise ValueError(f"Config must be a dict or JSON string, got: {type(config).__name__}")

if config_dict and config_spec_jsonschema is not None:
hardcoded_secrets: list[list[str]] = detect_hardcoded_secrets(
config=config_dict,
spec_json_schema=config_spec_jsonschema,
)
if hardcoded_secrets:
error_msg = "Configuration contains hardcoded secrets in fields: "
error_msg += ", ".join(
[".".join(hardcoded_secret) for hardcoded_secret in hardcoded_secrets]
)

error_msg += (
"Please use environment variables instead. For example:\n"
"To set a secret via reference, set its value to "
"`secret_reference::ENV_VAR_NAME`.\n"
)
raise ValueError(error_msg)

if config_secret_name is not None:
# Assume this is a secret name that points to a JSON/YAML config.
Expand Down
Loading