-
Notifications
You must be signed in to change notification settings - Fork 2
feat: implement dotenv-based secret management system #11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
7678837
feat: implement dotenv-based secret management system
devin-ai-integration[bot] 5b01bf2
fix: apply ruff formatting to resolve CI format check failure
devin-ai-integration[bot] 408441b
refactor: rename tool functions for better clarity
devin-ai-integration[bot] f6097b5
fix: apply ruff formatting to tests/test_secrets.py
devin-ai-integration[bot] ab81742
feat: add flexible populate_dotenv_missing_secrets_stubs function
devin-ai-integration[bot] 5ea259d
fix: apply ruff formatting to resolve CI format check failure
devin-ai-integration[bot] 4af4747
refactor: rename _extract_secrets_from_manifest to _extract_secrets_n…
devin-ai-integration[bot] 6db22ec
feat: implement stateless dotenv secret management system
devin-ai-integration[bot] 0c5a0cc
fix: apply ruff formatting to resolve CI format check failure
devin-ai-integration[bot] f7985c8
feat: add dotenv_path parameter to execute_stream_test_read and remov…
devin-ai-integration[bot] f3c1880
feat: remove set_dotenv_path function and switch to dot notation for …
devin-ai-integration[bot] 0dd8f1d
refactor: convert test classes to functions and remove redundant tests
devin-ai-integration[bot] 81ba3cd
Update connector_builder_mcp/_secrets.py
aaronsteers d4f90b9
fix: remove get_dotenv_path function and related references
devin-ai-integration[bot] 644491c
feat: change dotenv_path parameter type from str | None to Path | None
devin-ai-integration[bot] 89f9060
use csv format for inputs
aaronsteers b59c78d
Merge branch 'devin/1754097884-dotenv-implementation' of https://gith…
aaronsteers 8f90227
fix: update tests to use CSV string format for config_paths parameter
devin-ai-integration[bot] db0964e
fix: remove extra blank line to resolve ruff lint check
devin-ai-integration[bot] File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,337 @@ | ||
"""Secrets management for connector configurations using dotenv files. | ||
|
||
This module provides tools for managing secrets in .env files without exposing | ||
actual secret values to the LLM. It uses a file path approach where the LLM can | ||
manage secret stubs and the user provides actual values. | ||
""" | ||
|
||
import logging | ||
import os | ||
from pathlib import Path | ||
from typing import Annotated, Any | ||
|
||
from dotenv import dotenv_values, set_key | ||
from fastmcp import FastMCP | ||
from pydantic import BaseModel, Field | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
SECRETS_FILE_ENV_VAR = "CONNECTOR_BUILDER_SECRETS_FILE" | ||
DEFAULT_SECRETS_FILE = ".env" | ||
|
||
_current_dotenv_path: str | None = None | ||
|
||
|
||
class SecretInfo(BaseModel): | ||
"""Information about a secret without exposing its value.""" | ||
|
||
key: str | ||
is_set: bool | ||
description: str = "" | ||
aaronsteers marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
||
|
||
class SecretsFileInfo(BaseModel): | ||
"""Information about the secrets file and its contents.""" | ||
|
||
file_path: str | ||
exists: bool | ||
secrets: list[SecretInfo] | ||
|
||
|
||
def get_secrets_file_path() -> str: | ||
"""Get the path to the secrets file from tool setting, environment, or default. | ||
|
||
Priority order: | ||
1. Path set via set_dotenv_path tool | ||
2. Environment variable CONNECTOR_BUILDER_SECRETS_FILE | ||
3. Default .env file | ||
""" | ||
if _current_dotenv_path: | ||
return _current_dotenv_path | ||
return os.environ.get(SECRETS_FILE_ENV_VAR, DEFAULT_SECRETS_FILE) | ||
|
||
|
||
def set_dotenv_path( | ||
file_path: Annotated[str, Field(description="Path to the .env file to use for secrets")], | ||
) -> str: | ||
"""Set the path to the dotenv file for secrets management. | ||
|
||
This allows users to easily switch between different configuration files. | ||
|
||
Args: | ||
file_path: Path to the .env file to use | ||
|
||
Returns: | ||
Confirmation message with the absolute path | ||
""" | ||
global _current_dotenv_path | ||
|
||
abs_path = str(Path(file_path).resolve()) | ||
_current_dotenv_path = abs_path | ||
|
||
logger.info(f"Set dotenv path to: {abs_path}") | ||
|
||
Path(abs_path).parent.mkdir(parents=True, exist_ok=True) | ||
Path(abs_path).touch() | ||
|
||
return f"Dotenv path set to: {abs_path}" | ||
|
||
|
||
def load_secrets() -> dict[str, str]: | ||
"""Load secrets from the dotenv file. | ||
|
||
Returns: | ||
Dictionary of secret key-value pairs | ||
""" | ||
secrets_file = get_secrets_file_path() | ||
if not Path(secrets_file).exists(): | ||
logger.warning(f"Secrets file not found: {secrets_file}") | ||
return {} | ||
|
||
try: | ||
secrets = dotenv_values(secrets_file) | ||
filtered_secrets = {k: v for k, v in (secrets or {}).items() if v is not None} | ||
logger.info(f"Loaded {len(filtered_secrets)} secrets from {secrets_file}") | ||
return filtered_secrets | ||
except Exception as e: | ||
logger.error(f"Error loading secrets from {secrets_file}: {e}") | ||
return {} | ||
|
||
|
||
def hydrate_config(config: dict[str, Any]) -> dict[str, Any]: | ||
"""Hydrate configuration with secrets from dotenv file using naming convention. | ||
|
||
Environment variables are mapped to config paths using underscore convention: | ||
- CREDENTIALS_PASSWORD -> credentials.password | ||
- API_KEY -> api_key | ||
- OAUTH_CLIENT_SECRET -> oauth.client_secret | ||
|
||
Args: | ||
config: Configuration dictionary to hydrate with secrets | ||
|
||
Returns: | ||
Configuration with secrets injected from .env file | ||
""" | ||
if not config: | ||
return config | ||
|
||
secrets = load_secrets() | ||
if not secrets: | ||
return config | ||
|
||
def _set_nested_value(obj: dict[str, Any], path: list[str], value: str) -> None: | ||
"""Set a nested value in a dictionary using a path.""" | ||
current = obj | ||
for key in path[:-1]: | ||
if key not in current: | ||
current[key] = {} | ||
elif not isinstance(current[key], dict): | ||
return | ||
current = current[key] | ||
current[path[-1]] = value | ||
|
||
def _env_var_to_path(env_var: str) -> list[str]: | ||
aaronsteers marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
"""Convert environment variable name to config path. | ||
|
||
Examples: | ||
- CREDENTIALS_PASSWORD -> ['credentials', 'password'] | ||
- API_KEY -> ['api_key'] | ||
- OAUTH_CLIENT_SECRET -> ['oauth', 'client_secret'] | ||
""" | ||
lower_name = env_var.lower() | ||
|
||
if env_var in ["API_KEY", "ACCESS_TOKEN", "CLIENT_ID", "CLIENT_SECRET", "REFRESH_TOKEN"]: | ||
return [lower_name] | ||
|
||
parts = lower_name.split("_") | ||
|
||
# For single part, return as-is | ||
if len(parts) == 1: | ||
return parts | ||
|
||
# For two parts, check if it should be nested or flat | ||
if len(parts) == 2: | ||
if parts[0] in ["credentials", "oauth", "auth", "config", "settings"]: | ||
return parts | ||
else: | ||
return [lower_name] | ||
|
||
# For multi-part names, use first part as parent and join rest | ||
return [parts[0], "_".join(parts[1:])] | ||
|
||
result = config.copy() | ||
|
||
for env_var, secret_value in secrets.items(): | ||
if secret_value and not secret_value.startswith("#"): | ||
path = _env_var_to_path(env_var) | ||
_set_nested_value(result, path, secret_value) | ||
|
||
return result | ||
|
||
|
||
def list_dotenv_secrets() -> SecretsFileInfo: | ||
"""List all secrets in the secrets file without exposing values. | ||
|
||
Returns: | ||
Information about the secrets file and its contents | ||
""" | ||
secrets_file = get_secrets_file_path() | ||
file_path = Path(secrets_file) | ||
|
||
secrets_info = [] | ||
if file_path.exists(): | ||
try: | ||
secrets = dotenv_values(secrets_file) | ||
for key, value in (secrets or {}).items(): | ||
secrets_info.append( | ||
SecretInfo( | ||
key=key, | ||
is_set=bool(value and value.strip()), | ||
description=f"Secret value for {key}", | ||
) | ||
) | ||
except Exception as e: | ||
logger.error(f"Error reading secrets file: {e}") | ||
|
||
return SecretsFileInfo( | ||
file_path=str(file_path.absolute()), exists=file_path.exists(), secrets=secrets_info | ||
) | ||
|
||
|
||
def populate_dotenv_missing_secrets_stubs( | ||
manifest: Annotated[ | ||
dict[str, Any] | None, Field(description="Connector manifest to analyze for secrets") | ||
] = None, | ||
config_paths: Annotated[ | ||
list[str] | None, | ||
Field( | ||
description="List of config paths like ['credentials.password', 'oauth.client_secret']" | ||
), | ||
] = None, | ||
secret_key: Annotated[ | ||
str | None, Field(description="Single secret key to add (legacy mode)") | ||
] = None, | ||
description: Annotated[str, Field(description="Description for the secret(s)")] = "", | ||
aaronsteers marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
) -> str: | ||
"""Add secret stubs to the secrets file for the user to fill in. | ||
|
||
Supports three modes: | ||
1. Manifest-based: Pass manifest to auto-detect secrets from connection_specification | ||
2. Path-based: Pass config_paths list like ['credentials.password', 'oauth.client_secret'] | ||
3. Legacy: Pass single secret_key (for backward compatibility) | ||
|
||
Args: | ||
manifest: Connector manifest to analyze for airbyte_secret fields | ||
config_paths: List of config paths to convert to environment variables | ||
secret_key: Single secret key to add (legacy mode) | ||
description: Optional description of the secret(s) | ||
|
||
Returns: | ||
Message about the operation result | ||
""" | ||
if not any([manifest, config_paths, secret_key]): | ||
return "Error: Must provide either manifest, config_paths, or secret_key" | ||
|
||
secrets_file = get_secrets_file_path() | ||
|
||
try: | ||
Path(secrets_file).touch() | ||
|
||
secrets_to_add = [] | ||
|
||
if manifest: | ||
secrets_to_add.extend(_extract_secrets_names_from_manifest(manifest)) | ||
|
||
if config_paths: | ||
for path in config_paths: | ||
env_var = _config_path_to_env_var(path) | ||
secrets_to_add.append((env_var, f"Secret for {path}")) | ||
|
||
if secret_key: | ||
secrets_to_add.append((secret_key, description or f"Secret value for {secret_key}")) | ||
|
||
if not secrets_to_add: | ||
return "No secrets found to add" | ||
|
||
added_count = 0 | ||
for env_var, desc in secrets_to_add: | ||
placeholder_value = f"# TODO: Set actual value for {env_var}" | ||
if desc: | ||
placeholder_value += f" - {desc}" | ||
|
||
set_key(secrets_file, env_var, placeholder_value) | ||
added_count += 1 | ||
|
||
secret_names = [name for name, _ in secrets_to_add] | ||
return f"Added {added_count} secret stub(s) to {secrets_file}: {', '.join(secret_names)}. Please set the actual values." | ||
|
||
except Exception as e: | ||
logger.error(f"Error adding secret stubs: {e}") | ||
return f"Error adding secret stubs: {str(e)}" | ||
|
||
|
||
def _extract_secrets_names_from_manifest(manifest: dict[str, Any]) -> list[tuple[str, str]]: | ||
"""Extract secret fields from manifest connection specification. | ||
|
||
Args: | ||
manifest: Connector manifest dictionary | ||
|
||
Returns: | ||
List of (env_var_name, description) tuples | ||
""" | ||
secrets = [] | ||
|
||
try: | ||
spec = manifest.get("spec", {}) | ||
connection_spec = spec.get("connection_specification", {}) | ||
properties = connection_spec.get("properties", {}) | ||
|
||
for field_name, field_spec in properties.items(): | ||
if field_spec.get("airbyte_secret", False): | ||
env_var = _config_path_to_env_var(field_name) | ||
description = field_spec.get("description", f"Secret field: {field_name}") | ||
secrets.append((env_var, description)) | ||
|
||
except Exception as e: | ||
logger.warning(f"Error extracting secrets from manifest: {e}") | ||
|
||
return secrets | ||
|
||
|
||
def _config_path_to_env_var(config_path: str) -> str: | ||
"""Convert config path to environment variable name. | ||
|
||
Examples: | ||
- 'credentials.password' -> 'CREDENTIALS_PASSWORD' | ||
- 'api_key' -> 'API_KEY' | ||
- 'oauth.client_secret' -> 'OAUTH_CLIENT_SECRET' | ||
|
||
Args: | ||
config_path: Dot-separated config path | ||
|
||
Returns: | ||
Environment variable name | ||
""" | ||
return config_path.replace(".", "_").upper() | ||
|
||
|
||
def get_dotenv_path() -> str: | ||
"""Get the absolute path to the secrets file for user reference. | ||
|
||
Returns: | ||
Absolute path to the secrets file | ||
""" | ||
secrets_file = get_secrets_file_path() | ||
return str(Path(secrets_file).absolute()) | ||
|
||
|
||
def register_secrets_tools(app: FastMCP) -> None: | ||
"""Register secrets management tools with the FastMCP app. | ||
|
||
Args: | ||
app: FastMCP application instance | ||
""" | ||
app.tool(set_dotenv_path) | ||
app.tool(list_dotenv_secrets) | ||
app.tool(populate_dotenv_missing_secrets_stubs) | ||
app.tool(get_dotenv_path) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.