Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7678837
feat: implement dotenv-based secret management system
devin-ai-integration[bot] Aug 2, 2025
5b01bf2
fix: apply ruff formatting to resolve CI format check failure
devin-ai-integration[bot] Aug 2, 2025
408441b
refactor: rename tool functions for better clarity
devin-ai-integration[bot] Aug 2, 2025
f6097b5
fix: apply ruff formatting to tests/test_secrets.py
devin-ai-integration[bot] Aug 2, 2025
ab81742
feat: add flexible populate_dotenv_missing_secrets_stubs function
devin-ai-integration[bot] Aug 2, 2025
5ea259d
fix: apply ruff formatting to resolve CI format check failure
devin-ai-integration[bot] Aug 2, 2025
4af4747
refactor: rename _extract_secrets_from_manifest to _extract_secrets_n…
devin-ai-integration[bot] Aug 2, 2025
6db22ec
feat: implement stateless dotenv secret management system
devin-ai-integration[bot] Aug 2, 2025
0c5a0cc
fix: apply ruff formatting to resolve CI format check failure
devin-ai-integration[bot] Aug 2, 2025
f7985c8
feat: add dotenv_path parameter to execute_stream_test_read and remov…
devin-ai-integration[bot] Aug 2, 2025
f3c1880
feat: remove set_dotenv_path function and switch to dot notation for …
devin-ai-integration[bot] Aug 2, 2025
0dd8f1d
refactor: convert test classes to functions and remove redundant tests
devin-ai-integration[bot] Aug 2, 2025
81ba3cd
Update connector_builder_mcp/_secrets.py
aaronsteers Aug 2, 2025
d4f90b9
fix: remove get_dotenv_path function and related references
devin-ai-integration[bot] Aug 2, 2025
644491c
feat: change dotenv_path parameter type from str | None to Path | None
devin-ai-integration[bot] Aug 2, 2025
89f9060
use csv format for inputs
aaronsteers Aug 2, 2025
b59c78d
Merge branch 'devin/1754097884-dotenv-implementation' of https://gith…
aaronsteers Aug 2, 2025
8f90227
fix: update tests to use CSV string format for config_paths parameter
devin-ai-integration[bot] Aug 5, 2025
db0964e
fix: remove extra blank line to resolve ruff lint check
devin-ai-integration[bot] Aug 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion connector_builder_mcp/_connector_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
read_stream,
resolve_manifest,
)
from airbyte_cdk.models.airbyte_protocol import (
from airbyte_cdk.models import (
AirbyteStream,
ConfiguredAirbyteCatalog,
DestinationSyncMode,
Expand All @@ -25,6 +25,7 @@
from fastmcp import FastMCP
from pydantic import BaseModel, Field

from connector_builder_mcp._secrets import hydrate_config, register_secrets_tools
from connector_builder_mcp._util import validate_manifest_structure

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -103,6 +104,7 @@ def validate_manifest(
if config is None:
config = {}

config = hydrate_config(config)
config_with_manifest = {**config, "__injected_declarative_manifest": manifest}

limits = get_limits(config_with_manifest)
Expand Down Expand Up @@ -161,6 +163,7 @@ def execute_stream_test_read(
logger.info(f"Testing stream read for stream: {stream_name}")

try:
config = hydrate_config(config)
config_with_manifest = {
**config,
"__injected_declarative_manifest": manifest,
Expand Down Expand Up @@ -228,6 +231,7 @@ def get_resolved_manifest(
if config is None:
config = {}

config = hydrate_config(config)
config_with_manifest = {**config, "__injected_declarative_manifest": manifest}

limits = TestLimits(max_records=10, max_pages_per_slice=1, max_slices=1)
Expand Down Expand Up @@ -454,3 +458,4 @@ def register_connector_builder_tools(app: FastMCP) -> None:
app.tool(execute_stream_test_read)
app.tool(get_resolved_manifest)
app.tool(get_connector_builder_docs)
register_secrets_tools(app)
337 changes: 337 additions & 0 deletions connector_builder_mcp/_secrets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
"""Secrets management for connector configurations using dotenv files.

This module provides tools for managing secrets in .env files without exposing
actual secret values to the LLM. It uses a file path approach where the LLM can
manage secret stubs and the user provides actual values.
"""

import logging
import os
from pathlib import Path
from typing import Annotated, Any

from dotenv import dotenv_values, set_key
from fastmcp import FastMCP
from pydantic import BaseModel, Field

logger = logging.getLogger(__name__)

SECRETS_FILE_ENV_VAR = "CONNECTOR_BUILDER_SECRETS_FILE"
DEFAULT_SECRETS_FILE = ".env"

_current_dotenv_path: str | None = None


class SecretInfo(BaseModel):
"""Information about a secret without exposing its value."""

key: str
is_set: bool
description: str = ""


class SecretsFileInfo(BaseModel):
"""Information about the secrets file and its contents."""

file_path: str
exists: bool
secrets: list[SecretInfo]


def get_secrets_file_path() -> str:
"""Get the path to the secrets file from tool setting, environment, or default.

Priority order:
1. Path set via set_dotenv_path tool
2. Environment variable CONNECTOR_BUILDER_SECRETS_FILE
3. Default .env file
"""
if _current_dotenv_path:
return _current_dotenv_path
return os.environ.get(SECRETS_FILE_ENV_VAR, DEFAULT_SECRETS_FILE)


def set_dotenv_path(
file_path: Annotated[str, Field(description="Path to the .env file to use for secrets")],
) -> str:
"""Set the path to the dotenv file for secrets management.

This allows users to easily switch between different configuration files.

Args:
file_path: Path to the .env file to use

Returns:
Confirmation message with the absolute path
"""
global _current_dotenv_path

abs_path = str(Path(file_path).resolve())
_current_dotenv_path = abs_path

logger.info(f"Set dotenv path to: {abs_path}")

Path(abs_path).parent.mkdir(parents=True, exist_ok=True)
Path(abs_path).touch()

return f"Dotenv path set to: {abs_path}"


def load_secrets() -> dict[str, str]:
"""Load secrets from the dotenv file.

Returns:
Dictionary of secret key-value pairs
"""
secrets_file = get_secrets_file_path()
if not Path(secrets_file).exists():
logger.warning(f"Secrets file not found: {secrets_file}")
return {}

try:
secrets = dotenv_values(secrets_file)
filtered_secrets = {k: v for k, v in (secrets or {}).items() if v is not None}
logger.info(f"Loaded {len(filtered_secrets)} secrets from {secrets_file}")
return filtered_secrets
except Exception as e:
logger.error(f"Error loading secrets from {secrets_file}: {e}")
return {}


def hydrate_config(config: dict[str, Any]) -> dict[str, Any]:
"""Hydrate configuration with secrets from dotenv file using naming convention.

Environment variables are mapped to config paths using underscore convention:
- CREDENTIALS_PASSWORD -> credentials.password
- API_KEY -> api_key
- OAUTH_CLIENT_SECRET -> oauth.client_secret

Args:
config: Configuration dictionary to hydrate with secrets

Returns:
Configuration with secrets injected from .env file
"""
if not config:
return config

secrets = load_secrets()
if not secrets:
return config

def _set_nested_value(obj: dict[str, Any], path: list[str], value: str) -> None:
"""Set a nested value in a dictionary using a path."""
current = obj
for key in path[:-1]:
if key not in current:
current[key] = {}
elif not isinstance(current[key], dict):
return
current = current[key]
current[path[-1]] = value

def _env_var_to_path(env_var: str) -> list[str]:
"""Convert environment variable name to config path.

Examples:
- CREDENTIALS_PASSWORD -> ['credentials', 'password']
- API_KEY -> ['api_key']
- OAUTH_CLIENT_SECRET -> ['oauth', 'client_secret']
"""
lower_name = env_var.lower()

if env_var in ["API_KEY", "ACCESS_TOKEN", "CLIENT_ID", "CLIENT_SECRET", "REFRESH_TOKEN"]:
return [lower_name]

parts = lower_name.split("_")

# For single part, return as-is
if len(parts) == 1:
return parts

# For two parts, check if it should be nested or flat
if len(parts) == 2:
if parts[0] in ["credentials", "oauth", "auth", "config", "settings"]:
return parts
else:
return [lower_name]

# For multi-part names, use first part as parent and join rest
return [parts[0], "_".join(parts[1:])]

result = config.copy()

for env_var, secret_value in secrets.items():
if secret_value and not secret_value.startswith("#"):
path = _env_var_to_path(env_var)
_set_nested_value(result, path, secret_value)

return result


def list_dotenv_secrets() -> SecretsFileInfo:
"""List all secrets in the secrets file without exposing values.

Returns:
Information about the secrets file and its contents
"""
secrets_file = get_secrets_file_path()
file_path = Path(secrets_file)

secrets_info = []
if file_path.exists():
try:
secrets = dotenv_values(secrets_file)
for key, value in (secrets or {}).items():
secrets_info.append(
SecretInfo(
key=key,
is_set=bool(value and value.strip()),
description=f"Secret value for {key}",
)
)
except Exception as e:
logger.error(f"Error reading secrets file: {e}")

return SecretsFileInfo(
file_path=str(file_path.absolute()), exists=file_path.exists(), secrets=secrets_info
)


def populate_dotenv_missing_secrets_stubs(
manifest: Annotated[
dict[str, Any] | None, Field(description="Connector manifest to analyze for secrets")
] = None,
config_paths: Annotated[
list[str] | None,
Field(
description="List of config paths like ['credentials.password', 'oauth.client_secret']"
),
] = None,
secret_key: Annotated[
str | None, Field(description="Single secret key to add (legacy mode)")
] = None,
description: Annotated[str, Field(description="Description for the secret(s)")] = "",
) -> str:
"""Add secret stubs to the secrets file for the user to fill in.

Supports three modes:
1. Manifest-based: Pass manifest to auto-detect secrets from connection_specification
2. Path-based: Pass config_paths list like ['credentials.password', 'oauth.client_secret']
3. Legacy: Pass single secret_key (for backward compatibility)

Args:
manifest: Connector manifest to analyze for airbyte_secret fields
config_paths: List of config paths to convert to environment variables
secret_key: Single secret key to add (legacy mode)
description: Optional description of the secret(s)

Returns:
Message about the operation result
"""
if not any([manifest, config_paths, secret_key]):
return "Error: Must provide either manifest, config_paths, or secret_key"

secrets_file = get_secrets_file_path()

try:
Path(secrets_file).touch()

secrets_to_add = []

if manifest:
secrets_to_add.extend(_extract_secrets_from_manifest(manifest))

if config_paths:
for path in config_paths:
env_var = _config_path_to_env_var(path)
secrets_to_add.append((env_var, f"Secret for {path}"))

if secret_key:
secrets_to_add.append((secret_key, description or f"Secret value for {secret_key}"))

if not secrets_to_add:
return "No secrets found to add"

added_count = 0
for env_var, desc in secrets_to_add:
placeholder_value = f"# TODO: Set actual value for {env_var}"
if desc:
placeholder_value += f" - {desc}"

set_key(secrets_file, env_var, placeholder_value)
added_count += 1

secret_names = [name for name, _ in secrets_to_add]
return f"Added {added_count} secret stub(s) to {secrets_file}: {', '.join(secret_names)}. Please set the actual values."

except Exception as e:
logger.error(f"Error adding secret stubs: {e}")
return f"Error adding secret stubs: {str(e)}"


def _extract_secrets_from_manifest(manifest: dict[str, Any]) -> list[tuple[str, str]]:
"""Extract secret fields from manifest connection specification.

Args:
manifest: Connector manifest dictionary

Returns:
List of (env_var_name, description) tuples
"""
secrets = []

try:
spec = manifest.get("spec", {})
connection_spec = spec.get("connection_specification", {})
properties = connection_spec.get("properties", {})

for field_name, field_spec in properties.items():
if field_spec.get("airbyte_secret", False):
env_var = _config_path_to_env_var(field_name)
description = field_spec.get("description", f"Secret field: {field_name}")
secrets.append((env_var, description))

except Exception as e:
logger.warning(f"Error extracting secrets from manifest: {e}")

return secrets


def _config_path_to_env_var(config_path: str) -> str:
"""Convert config path to environment variable name.

Examples:
- 'credentials.password' -> 'CREDENTIALS_PASSWORD'
- 'api_key' -> 'API_KEY'
- 'oauth.client_secret' -> 'OAUTH_CLIENT_SECRET'

Args:
config_path: Dot-separated config path

Returns:
Environment variable name
"""
return config_path.replace(".", "_").upper()


def get_dotenv_path() -> str:
"""Get the absolute path to the secrets file for user reference.

Returns:
Absolute path to the secrets file
"""
secrets_file = get_secrets_file_path()
return str(Path(secrets_file).absolute())


def register_secrets_tools(app: FastMCP) -> None:
"""Register secrets management tools with the FastMCP app.

Args:
app: FastMCP application instance
"""
app.tool(set_dotenv_path)
app.tool(list_dotenv_secrets)
app.tool(populate_dotenv_missing_secrets_stubs)
app.tool(get_dotenv_path)
2 changes: 2 additions & 0 deletions connector_builder_mcp/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def initialize_logging() -> None:
def filter_config_secrets(config: dict[str, Any]) -> dict[str, Any]:
"""Filter sensitive information from configuration for logging.

Note: For config hydration with secrets, see _secrets.hydrate_config()

Args:
config: Configuration dictionary that may contain secrets

Expand Down
Loading
Loading