diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index 32ccb8e1..ee950bb2 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -20,6 +20,7 @@ import requests from airbyte_api import api, models +from airbyte.constants import CLOUD_API_ROOT, CLOUD_CONFIG_API_ROOT from airbyte.exceptions import ( AirbyteConnectionSyncError, AirbyteError, @@ -44,20 +45,6 @@ JOB_WAIT_INTERVAL_SECS = 2.0 JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour -CLOUD_API_ROOT = "https://api.airbyte.com/v1" -"""The Airbyte Cloud API root URL. - -This is the root URL for the Airbyte Cloud API. It is used to interact with the Airbyte Cloud API -and is the default API root for the `CloudWorkspace` class. -- https://reference.airbyte.com/reference/getting-started -""" -CLOUD_CONFIG_API_ROOT = "https://cloud.airbyte.com/api/v1" -"""Internal-Use API Root, aka Airbyte "Config API". - -Documentation: -- https://docs.airbyte.com/api-documentation#configuration-api-deprecated -- https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml -""" def status_ok(status_code: int) -> bool: @@ -73,6 +60,19 @@ def get_config_api_root(api_root: str) -> str: raise NotImplementedError("Configuration API root not implemented for this API root.") +def get_web_url_root(api_root: str) -> str: + """Get the web URL root from the main API root. + + # TODO: This does not return a valid URL for self-managed instances, due to not knowing the + # web URL root. Logged here: + # - https://github.com/airbytehq/PyAirbyte/issues/563 + """ + if api_root == CLOUD_API_ROOT: + return "https://cloud.airbyte.com" + + return api_root + + def get_airbyte_server_instance( *, api_root: str, @@ -522,6 +522,28 @@ def delete_source( ) +# Utility function + + +def _get_destination_type_str( + destination: DestinationConfiguration | dict[str, Any], +) -> str: + if isinstance(destination, dict): + destination_type = destination.get("destinationType") + else: + destination_type = getattr(destination, "DESTINATION_TYPE", None) + + if not destination_type or not isinstance(destination_type, str): + raise PyAirbyteInputError( + message="Could not determine destination type from configuration.", + context={ + "destination": destination, + }, + ) + + return destination_type + + # Create, get, and delete destinations @@ -540,8 +562,14 @@ def create_destination( client_secret=client_secret, api_root=api_root, ) + definition_id_override: str | None = None + if _get_destination_type_str(config) == "dev-null": + # TODO: We have to hard-code the definition ID for dev-null destination. (important-comment) + # https://github.com/airbytehq/PyAirbyte/issues/743 + definition_id_override = "a7bcc9d8-13b3-4e49-b80d-d020b90045e3" response: api.CreateDestinationResponse = airbyte_instance.destinations.create_destination( models.DestinationCreateRequest( + definition_id=definition_id_override, name=name, workspace_id=workspace_id, configuration=config, # Speakeasy API wants a dataclass, not a dict diff --git a/airbyte/cloud/auth.py b/airbyte/cloud/auth.py new file mode 100644 index 00000000..7925bdd9 --- /dev/null +++ b/airbyte/cloud/auth.py @@ -0,0 +1,41 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Authentication-related constants and utilities for the Airbyte Cloud.""" + +from airbyte import constants +from airbyte.secrets import SecretString +from airbyte.secrets.util import get_secret, try_get_secret + + +def resolve_cloud_client_secret( + input_value: str | SecretString | None = None, + /, +) -> SecretString: + """Get the Airbyte Cloud client secret from the environment.""" + return get_secret(constants.CLOUD_CLIENT_SECRET_ENV_VAR, default=input_value) + + +def resolve_cloud_client_id( + input_value: str | SecretString | None = None, + /, +) -> SecretString: + """Get the Airbyte Cloud client ID from the environment.""" + return get_secret(constants.CLOUD_CLIENT_ID_ENV_VAR, default=input_value) + + +def resolve_cloud_api_url( + input_value: str | None = None, + /, +) -> str: + """Get the Airbyte Cloud API URL from the environment, or return the default.""" + return str( + try_get_secret(constants.CLOUD_API_ROOT_ENV_VAR, default=input_value) + or constants.CLOUD_API_ROOT + ) + + +def resolve_cloud_workspace_id( + input_value: str | None = None, + /, +) -> str: + """Get the Airbyte Cloud workspace ID from the environment, or return None if not set.""" + return str(get_secret(constants.CLOUD_WORKSPACE_ID_ENV_VAR, default=input_value)) diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index 10cb1d1b..8ea570ac 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -130,13 +130,13 @@ def table_prefix(self) -> str: @property def connection_url(self) -> str | None: - """The URL to the connection.""" + """The web URL to the connection.""" return f"{self.workspace.workspace_url}/connections/{self.connection_id}" @property def job_history_url(self) -> str | None: """The URL to the job history for the connection.""" - return f"{self.connection_url}/job-history" + return f"{self.connection_url}/timeline" # Run Sync @@ -169,6 +169,13 @@ def run_sync( return sync_result + def __repr__(self) -> str: + """String representation of the connection.""" + return ( + f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " + f"destination_id={self.destination_id}, connection_url={self.connection_url})" + ) + # Logs def get_previous_sync_logs( diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index 22a6acf6..6e7cc129 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -101,8 +101,17 @@ def __init__( @property def connector_url(self) -> str: - """Get the URL of the source connector.""" - return f"{self.workspace.workspace_url}/{self.connector_type}s/{self.connector_id}" + """Get the web URL of the source connector.""" + return f"{self.workspace.workspace_url}/{self.connector_type}/{self.connector_id}" + + def __repr__(self) -> str: + """String representation of the connector.""" + return ( + f"CloudConnector(type={self.connector_type!s}, " + f"workspace_id={self.workspace.workspace_id}, " + f"connector_id={self.connector_id}, " + f"connector_url={self.connector_url})" + ) def permanently_delete(self) -> None: """Permanently delete the connector.""" diff --git a/airbyte/cloud/sync_results.py b/airbyte/cloud/sync_results.py index f4767a05..56d09530 100644 --- a/airbyte/cloud/sync_results.py +++ b/airbyte/cloud/sync_results.py @@ -144,8 +144,15 @@ class SyncResult: @property def job_url(self) -> str: - """Return the URL of the sync job.""" - return f"{self.connection.job_history_url}/{self.job_id}" + """Return the URL of the sync job. + + Note: This currently returns the connection's job history URL, as there is no direct URL + to a specific job in the Airbyte Cloud web app. + + TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. + E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true + """ + return f"{self.connection.job_history_url}" def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse: """Return connection info for the sync job.""" diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index 760cb690..f3550c6e 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -40,6 +40,7 @@ from airbyte import exceptions as exc from airbyte._util import api_util, text_util +from airbyte._util.api_util import get_web_url_root from airbyte.cloud.connections import CloudConnection from airbyte.cloud.connectors import CloudDestination, CloudSource from airbyte.destinations.base import Destination @@ -72,8 +73,8 @@ def __post_init__(self) -> None: @property def workspace_url(self) -> str | None: - """The URL of the workspace.""" - return f"{self.api_root}/workspaces/{self.workspace_id}" + """The web URL of the workspace.""" + return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" # Test connection and creds @@ -375,7 +376,10 @@ def list_connections( *, name_filter: Callable | None = None, ) -> list[CloudConnection]: - """List connections by name in the workspace.""" + """List connections by name in the workspace. + + TODO: Add pagination support + """ connections = api_util.list_connections( api_root=self.api_root, workspace_id=self.workspace_id, @@ -401,7 +405,10 @@ def list_sources( *, name_filter: Callable | None = None, ) -> list[CloudSource]: - """List all sources in the workspace.""" + """List all sources in the workspace. + + TODO: Add pagination support + """ sources = api_util.list_sources( api_root=self.api_root, workspace_id=self.workspace_id, @@ -425,7 +432,10 @@ def list_destinations( *, name_filter: Callable | None = None, ) -> list[CloudDestination]: - """List all destinations in the workspace.""" + """List all destinations in the workspace. + + TODO: Add pagination support + """ destinations = api_util.list_destinations( api_root=self.api_root, workspace_id=self.workspace_id, diff --git a/airbyte/constants.py b/airbyte/constants.py index e2efd7fc..5ab4cd03 100644 --- a/airbyte/constants.py +++ b/airbyte/constants.py @@ -178,3 +178,33 @@ def _str_to_bool(value: str) -> bool: For more information, see the `airbyte.secrets` module documentation. """ + +# Cloud Constants + +CLOUD_CLIENT_ID_ENV_VAR: str = "AIRBYTE_CLOUD_CLIENT_ID" +"""The environment variable name for the Airbyte Cloud client ID.""" + +CLOUD_CLIENT_SECRET_ENV_VAR: str = "AIRBYTE_CLOUD_CLIENT_SECRET" +"""The environment variable name for the Airbyte Cloud client secret.""" + +CLOUD_API_ROOT_ENV_VAR: str = "AIRBYTE_CLOUD_API_URL" +"""The environment variable name for the Airbyte Cloud API URL.""" + +CLOUD_WORKSPACE_ID_ENV_VAR: str = "AIRBYTE_CLOUD_WORKSPACE_ID" +"""The environment variable name for the Airbyte Cloud workspace ID.""" + +CLOUD_API_ROOT: str = "https://api.airbyte.com/v1" +"""The Airbyte Cloud API root URL. + +This is the root URL for the Airbyte Cloud API. It is used to interact with the Airbyte Cloud API +and is the default API root for the `CloudWorkspace` class. +- https://reference.airbyte.com/reference/getting-started +""" + +CLOUD_CONFIG_API_ROOT: str = "https://cloud.airbyte.com/api/v1" +"""Internal-Use API Root, aka Airbyte "Config API". + +Documentation: +- https://docs.airbyte.com/api-documentation#configuration-api-deprecated +- https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml +""" diff --git a/airbyte/exceptions.py b/airbyte/exceptions.py index 3e897a33..e082f7a8 100644 --- a/airbyte/exceptions.py +++ b/airbyte/exceptions.py @@ -462,7 +462,7 @@ class AirbyteConnectionError(AirbyteError): @property def connection_url(self) -> str | None: - """The URL to the connection where the error occurred.""" + """The web URL to the connection where the error occurred.""" if self.workspace_url and self.connection_id: return f"{self.workspace_url}/connections/{self.connection_id}" @@ -472,7 +472,7 @@ def connection_url(self) -> str | None: def job_history_url(self) -> str | None: """The URL to the job history where the error occurred.""" if self.connection_url: - return f"{self.connection_url}/job-history" + return f"{self.connection_url}/timeline" return None diff --git a/airbyte/mcp/_cloud_ops.py b/airbyte/mcp/_cloud_ops.py index 92661e81..969eb5a2 100644 --- a/airbyte/mcp/_cloud_ops.py +++ b/airbyte/mcp/_cloud_ops.py @@ -6,29 +6,294 @@ from fastmcp import FastMCP from pydantic import Field -from airbyte import cloud, secrets +from airbyte import cloud, get_destination, get_source from airbyte._util.api_imports import JobStatusEnum -from airbyte._util.api_util import CLOUD_API_ROOT +from airbyte.cloud.auth import ( + resolve_cloud_api_url, + resolve_cloud_client_id, + resolve_cloud_client_secret, + resolve_cloud_workspace_id, +) +from airbyte.cloud.connections import CloudConnection +from airbyte.cloud.connectors import CloudDestination, CloudSource +from airbyte.cloud.workspaces import CloudWorkspace +from airbyte.destinations.util import get_noop_destination +from airbyte.mcp._util import resolve_config + + +def _get_cloud_workspace() -> CloudWorkspace: + """Get an authenticated CloudWorkspace using environment variables.""" + return CloudWorkspace( + workspace_id=resolve_cloud_workspace_id(), + client_id=resolve_cloud_client_id(), + client_secret=resolve_cloud_client_secret(), + api_root=resolve_cloud_api_url(), + ) # @app.tool() # << deferred -def get_cloud_sync_status( - workspace_id: Annotated[ +def deploy_source_to_cloud( + source_name: Annotated[ str, - Field( - description="The ID of the Airbyte Cloud workspace.", - ), + Field(description="The name to use when deploying the source."), ], - connection_id: Annotated[ + source_connector_name: Annotated[ str, - Field( - description="The ID of the Airbyte Cloud connection.", - ), + Field(description="The name of the source connector (e.g., 'source-faker')."), ], - api_root: Annotated[ + *, + config: Annotated[ + dict | str | None, + Field(description="The configuration for the source connector."), + ] = None, + config_secret_name: Annotated[ str | None, + Field(description="The name of the secret containing the configuration."), + ] = None, + unique: Annotated[ + bool, + Field(description="Whether to require a unique name."), + ] = True, +) -> str: + """Deploy a source connector to Airbyte Cloud. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + try: + source = get_source(source_connector_name) + config_dict = resolve_config( + config=config, + config_secret_name=config_secret_name, + config_spec_jsonschema=source.config_spec, + ) + source.set_config(config_dict) + + workspace: CloudWorkspace = _get_cloud_workspace() + deployed_source = workspace.deploy_source( + name=source_name, + source=source, + unique=unique, + ) + + except Exception as ex: + return f"Failed to deploy source '{source_name}': {ex}" + else: + return ( + f"Successfully deployed source '{source_name}' with ID '{deployed_source.connector_id}'" + f" and URL: {deployed_source.connector_url}" + ) + + +# @app.tool() # << deferred +def deploy_destination_to_cloud( + destination_name: Annotated[ + str, + Field(description="The name to use when deploying the destination."), + ], + destination_connector_name: Annotated[ + str, + Field(description="The name of the destination connector (e.g., 'destination-postgres')."), + ], + *, + config: Annotated[ + dict | str | None, + Field(description="The configuration for the destination connector."), + ] = None, + config_secret_name: Annotated[ + str | None, + Field(description="The name of the secret containing the configuration."), + ] = None, + unique: Annotated[ + bool, + Field(description="Whether to require a unique name."), + ] = True, +) -> str: + """Deploy a destination connector to Airbyte Cloud. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + try: + destination = get_destination(destination_connector_name) + config_dict = resolve_config( + config=config, + config_secret_name=config_secret_name, + config_spec_jsonschema=destination.config_spec, + ) + destination.set_config(config_dict) + + workspace: CloudWorkspace = _get_cloud_workspace() + deployed_destination = workspace.deploy_destination( + name=destination_name, + destination=destination, + unique=unique, + ) + + except Exception as ex: + return f"Failed to deploy destination '{destination_name}': {ex}" + else: + return ( + f"Successfully deployed destination '{destination_name}' " + f"with ID: {deployed_destination.connector_id}" + ) + + +# @app.tool() # << deferred +def create_connection_on_cloud( + connection_name: Annotated[ + str, + Field(description="The name of the connection."), + ], + source_id: Annotated[ + str, + Field(description="The ID of the deployed source."), + ], + destination_id: Annotated[ + str, + Field(description="The ID of the deployed destination."), + ], + selected_streams: Annotated[ + list[str], + Field(description="The selected stream names to sync within the connection."), + ], + table_prefix: Annotated[ + str | None, + Field(description="Optional table prefix to use when syncing to the destination."), + ] = None, +) -> str: + """Create a connection between a deployed source and destination on Airbyte Cloud. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + try: + workspace: CloudWorkspace = _get_cloud_workspace() + deployed_connection = workspace.deploy_connection( + connection_name=connection_name, + source=source_id, + destination=destination_id, + selected_streams=selected_streams, + table_prefix=table_prefix, + ) + + except Exception as ex: + return f"Failed to create connection '{connection_name}': {ex}" + else: + return ( + f"Successfully created connection '{connection_name}' " + f"with ID '{deployed_connection.connection_id}' and " + f"URL: {deployed_connection.connection_url}" + ) + + +# @app.tool() # << deferred +def run_cloud_sync( + connection_id: Annotated[ + str, + Field(description="The ID of the Airbyte Cloud connection."), + ], + *, + wait: Annotated[ + bool, + Field(description="Whether to wait for the sync to complete."), + ] = True, + wait_timeout: Annotated[ + int, + Field(description="Maximum time to wait for sync completion (seconds)."), + ] = 300, +) -> str: + """Run a sync job on Airbyte Cloud. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + try: + workspace: CloudWorkspace = _get_cloud_workspace() + connection = workspace.get_connection(connection_id=connection_id) + sync_result = connection.run_sync(wait=wait, wait_timeout=wait_timeout) + + except Exception as ex: + return f"Failed to run sync for connection '{connection_id}': {ex}" + else: + if wait: + status = sync_result.get_job_status() + return ( + f"Sync completed with status: {status}. " # Sync completed. + f"Job ID is '{sync_result.job_id}' and " + f"job URL is: {sync_result.job_url}" + ) + return ( + f"Sync started. " # Sync started. + f"Job ID is '{sync_result.job_id}' and " + f"job URL is: {sync_result.job_url}" + ) + + +# @app.tool() # << deferred +def check_airbyte_cloud_workspace() -> str: + """Check if we have a valid Airbyte Cloud connection and return workspace info. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + + Returns workspace ID and workspace URL for verification. + """ + try: + workspace: CloudWorkspace = _get_cloud_workspace() + workspace.connect() + + except Exception as ex: + return f"❌ Failed to connect to Airbyte Cloud workspace: {ex}" + else: + return ( + f"✅ Successfully connected to Airbyte Cloud workspace.\n" + f"Workspace ID: {workspace.workspace_id}\n" + f"Workspace URL: {workspace.workspace_url}" + ) + + +# @app.tool() # << deferred +def deploy_noop_destination_to_cloud( + name: str = "No-op Destination", + *, + unique: bool = True, +) -> str: + """Deploy the No-op destination to Airbyte Cloud for testing purposes. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + try: + destination = get_noop_destination() + workspace: CloudWorkspace = _get_cloud_workspace() + deployed_destination = workspace.deploy_destination( + name=name, + destination=destination, + unique=unique, + ) + except Exception as ex: + return f"Failed to deploy No-op Destination: {ex}" + else: + return ( + f"Successfully deployed No-op Destination " + f"with ID '{deployed_destination.connector_id}' and " + f"URL: {deployed_destination.connector_url}" + ) + + +# @app.tool() # << deferred +def get_cloud_sync_status( + connection_id: Annotated[ + str, Field( - description="Optional Cloud API root URL override.", + description="The ID of the Airbyte Cloud connection.", ), ], job_id: Annotated[ @@ -38,17 +303,11 @@ def get_cloud_sync_status( ) -> JobStatusEnum | None: """Get the status of a sync job from the Airbyte Cloud. - By default, the `AIRBYTE_CLIENT_ID` and `AIRBYTE_CLIENT_SECRET` environment variables will be - used to authenticate with the Airbyte Cloud API. + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. """ - workspace = cloud.CloudWorkspace( - workspace_id, - # We'll attempt any configured secrets managers to retrieve the client ID and secret. - # If no other secret manager is defined, this normally comes from environment variables. - client_id=secrets.get_secret("AIRBYTE_CLIENT_ID"), - client_secret=secrets.get_secret("AIRBYTE_CLIENT_SECRET"), - api_root=api_root or CLOUD_API_ROOT, # Defaults to the Airbyte Cloud API root if None. - ) + workspace: CloudWorkspace = _get_cloud_workspace() connection = workspace.get_connection(connection_id=connection_id) # If a job ID is provided, get the job by ID. @@ -56,6 +315,51 @@ def get_cloud_sync_status( return sync_result.get_job_status() if sync_result else None +# @app.tool() # << deferred +def list_deployed_cloud_source_connectors() -> list[CloudSource]: + """List all deployed source connectors in the Airbyte Cloud workspace. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + workspace: CloudWorkspace = _get_cloud_workspace() + return workspace.list_sources() + + +# @app.tool() # << deferred +def list_deployed_cloud_destination_connectors() -> list[CloudDestination]: + """List all deployed destination connectors in the Airbyte Cloud workspace. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + workspace: CloudWorkspace = _get_cloud_workspace() + return workspace.list_destinations() + + +# @app.tool() # << deferred +def list_deployed_cloud_connections() -> list[CloudConnection]: + """List all deployed connections in the Airbyte Cloud workspace. + + By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, + and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Airbyte Cloud API. + """ + workspace: CloudWorkspace = _get_cloud_workspace() + return workspace.list_connections() + + def register_cloud_ops_tools(app: FastMCP) -> None: """Register tools with the FastMCP app.""" + app.tool(check_airbyte_cloud_workspace) + app.tool(deploy_source_to_cloud) + app.tool(deploy_destination_to_cloud) + app.tool(deploy_noop_destination_to_cloud) + app.tool(create_connection_on_cloud) + app.tool(run_cloud_sync) app.tool(get_cloud_sync_status) + app.tool(list_deployed_cloud_source_connectors) + app.tool(list_deployed_cloud_destination_connectors) + app.tool(list_deployed_cloud_connections) diff --git a/airbyte/mcp/_util.py b/airbyte/mcp/_util.py index 3fb8ed9c..097ab29c 100644 --- a/airbyte/mcp/_util.py +++ b/airbyte/mcp/_util.py @@ -9,7 +9,7 @@ import dotenv import yaml -from airbyte.secrets import GoogleGSMSecretManager, register_secret_manager +from airbyte.secrets import DotenvSecretManager, GoogleGSMSecretManager, register_secret_manager from airbyte.secrets.hydration import deep_update, detect_hardcoded_secrets from airbyte.secrets.util import get_secret, is_secret_available @@ -31,8 +31,12 @@ def initialize_secrets() -> None: """Initialize dotenv to load environment variables from .env files.""" # Load the .env file from the current working directory. if AIRBYTE_MCP_DOTENV_PATH_ENVVAR in os.environ: - dotenv_path = Path(os.environ[AIRBYTE_MCP_DOTENV_PATH_ENVVAR]) + dotenv_path = Path(os.environ[AIRBYTE_MCP_DOTENV_PATH_ENVVAR]).absolute() + custom_dotenv_secret_mgr = DotenvSecretManager(dotenv_path) _load_dotenv_file(dotenv_path) + register_secret_manager( + custom_dotenv_secret_mgr, + ) if is_secret_available("GCP_GSM_CREDENTIALS") and is_secret_available("GCP_GSM_PROJECT_ID"): # Initialize the GoogleGSMSecretManager if the credentials and project are set. diff --git a/airbyte/secrets/config.py b/airbyte/secrets/config.py index f793043d..63e30e95 100644 --- a/airbyte/secrets/config.py +++ b/airbyte/secrets/config.py @@ -14,7 +14,6 @@ if TYPE_CHECKING: from airbyte.secrets.base import SecretSourceEnum - from airbyte.secrets.custom import CustomSecretManager _SECRETS_SOURCES: list[SecretManager] = [] @@ -44,7 +43,7 @@ def _get_secret_sources() -> list[SecretManager]: def register_secret_manager( - secret_manager: CustomSecretManager, + secret_manager: SecretManager, *, as_backup: bool = False, replace_existing: bool = False, diff --git a/airbyte/secrets/env_vars.py b/airbyte/secrets/env_vars.py index 4cd0b2e3..cc5bf34f 100644 --- a/airbyte/secrets/env_vars.py +++ b/airbyte/secrets/env_vars.py @@ -4,12 +4,17 @@ from __future__ import annotations import os +from typing import TYPE_CHECKING from dotenv import dotenv_values from airbyte.secrets.base import SecretManager, SecretSourceEnum, SecretString +if TYPE_CHECKING: + from pathlib import Path + + class EnvVarSecretManager(SecretManager): """Secret manager that retrieves secrets from environment variables.""" @@ -26,12 +31,28 @@ def get_secret(self, secret_name: str) -> SecretString | None: class DotenvSecretManager(SecretManager): """Secret manager that retrieves secrets from a `.env` file.""" - name = SecretSourceEnum.DOTENV.value + dotenv_path: Path | None = None + + @property + def name(self) -> str: # type: ignore[override] + """Get name of secret manager.""" + if self.dotenv_path: + return f"{SecretSourceEnum.DOTENV.value}:{self.dotenv_path}" + return SecretSourceEnum.DOTENV.value + + def __init__( + self, + dotenv_path: Path | None = None, + ) -> None: + """Initialize a new .env Secret Manager, with optionally specified file path.""" + self.dotenv_path = dotenv_path def get_secret(self, secret_name: str) -> SecretString | None: """Get a named secret from the `.env` file.""" try: - dotenv_vars: dict[str, str | None] = dotenv_values() + dotenv_vars: dict[str, str | None] = dotenv_values( + dotenv_path=self.dotenv_path, + ) except Exception: # Can't locate or parse a .env file return None diff --git a/airbyte/secrets/google_gsm.py b/airbyte/secrets/google_gsm.py index 78e190c1..6c76844c 100644 --- a/airbyte/secrets/google_gsm.py +++ b/airbyte/secrets/google_gsm.py @@ -161,13 +161,19 @@ def _fully_qualified_secret_name(self, secret_name: str) -> str: return full_name - def get_secret(self, secret_name: str) -> SecretString: - """Get a named secret from Google Colab user secrets.""" - return SecretString( - self.secret_client.access_secret_version( - name=self._fully_qualified_secret_name(secret_name) - ).payload.data.decode("UTF-8") - ) + def get_secret(self, secret_name: str) -> SecretString | None: + """Get a named secret from GSM. + + Returns 'None' if the secret is not found. + """ + try: + return SecretString( + self.secret_client.access_secret_version( + name=self._fully_qualified_secret_name(secret_name) + ).payload.data.decode("UTF-8") + ) + except Exception: + return None def get_secret_handle( self, diff --git a/airbyte/secrets/util.py b/airbyte/secrets/util.py index 69d14209..1c5c5869 100644 --- a/airbyte/secrets/util.py +++ b/airbyte/secrets/util.py @@ -4,6 +4,7 @@ from __future__ import annotations import warnings +from contextlib import suppress from typing import Any, cast from airbyte import exceptions as exc @@ -29,11 +30,42 @@ def is_secret_available( return True +def try_get_secret( + secret_name: str, + /, + default: str | SecretString | None = None, + sources: list[SecretManager | SecretSourceEnum] | None = None, + **kwargs: dict[str, Any], +) -> SecretString | None: + """Try to get a secret from the environment, failing gracefully. + + This function attempts to retrieve a secret from the configured secret sources. + If the secret is found, it returns the secret value; otherwise, it returns the + default value or None. + + This function will not prompt the user for input if the secret is not found. + + Raises: + PyAirbyteInputError: If an invalid source name is provided in the `sources` argument. + """ + with suppress(exc.PyAirbyteSecretNotFoundError): + return get_secret( + secret_name, + sources=sources, + allow_prompt=False, + default=default, + **kwargs, + ) + + return None + + def get_secret( secret_name: str, /, *, sources: list[SecretManager | SecretSourceEnum] | None = None, + default: str | SecretString | None = None, allow_prompt: bool = True, **kwargs: dict[str, Any], ) -> SecretString: @@ -45,6 +77,11 @@ def get_secret( If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then the user will be prompted to enter the secret if it is not found in any of the other sources. + + Raises: + PyAirbyteSecretNotFoundError: If the secret is not found in any of the configured sources, + and if no default value is provided. + PyAirbyteInputError: If an invalid source name is provided in the `sources` argument. """ if secret_name.startswith(SECRETS_HYDRATION_PREFIX): # If the secret name starts with the hydration prefix, we assume it's a secret reference. @@ -103,6 +140,9 @@ def get_secret( if val: return SecretString(val) + if default: + return SecretString(default) + raise exc.PyAirbyteSecretNotFoundError( secret_name=secret_name, sources=[str(s) for s in available_sources], diff --git a/examples/run_bigquery_destination.py b/examples/run_bigquery_destination.py index 04c1a13a..64dbcd80 100644 --- a/examples/run_bigquery_destination.py +++ b/examples/run_bigquery_destination.py @@ -13,6 +13,7 @@ import airbyte as ab from airbyte.secrets.google_gsm import GoogleGSMSecretManager + warnings.filterwarnings("ignore", message="Cannot create BigQuery Storage client") @@ -20,7 +21,7 @@ SECRET_NAME = "SECRET_DESTINATION-BIGQUERY_CREDENTIALS__CREDS" bigquery_destination_secret: dict = ( - GoogleGSMSecretManager( + GoogleGSMSecretManager( # type: ignore[union-attr] project=AIRBYTE_INTERNAL_GCP_PROJECT, credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"), ) diff --git a/examples/run_bigquery_faker.py b/examples/run_bigquery_faker.py index cbfcfd41..74c1f20b 100644 --- a/examples/run_bigquery_faker.py +++ b/examples/run_bigquery_faker.py @@ -14,6 +14,7 @@ from airbyte.caches.bigquery import BigQueryCache from airbyte.secrets.google_gsm import GoogleGSMSecretManager + warnings.filterwarnings("ignore", message="Cannot create BigQuery Storage client") @@ -21,7 +22,7 @@ SECRET_NAME = "SECRET_DESTINATION-BIGQUERY_CREDENTIALS__CREDS" bigquery_destination_secret: dict = ( - GoogleGSMSecretManager( + GoogleGSMSecretManager( # type: ignore[union-attr] project=AIRBYTE_INTERNAL_GCP_PROJECT, credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"), ) diff --git a/tests/integration_tests/cloud/test_cloud_api_util.py b/tests/integration_tests/cloud/test_cloud_api_util.py index 4aebb424..c0ba2c99 100644 --- a/tests/integration_tests/cloud/test_cloud_api_util.py +++ b/tests/integration_tests/cloud/test_cloud_api_util.py @@ -6,20 +6,25 @@ """ from __future__ import annotations + from typing import Literal -from airbyte_api.models import DestinationResponse, SourceResponse, WorkspaceResponse +import pytest from airbyte._util import api_util, text_util from airbyte._util.api_util import ( - get_bearer_token, - check_connector, - AirbyteError, CLOUD_API_ROOT, + AirbyteError, + check_connector, + get_bearer_token, ) -from airbyte_api.models import DestinationDuckdb, SourceFaker - from airbyte.secrets.base import SecretString -import pytest +from airbyte_api.models import ( + DestinationDuckdb, + DestinationResponse, + SourceFaker, + SourceResponse, + WorkspaceResponse, +) def test_get_workspace( @@ -243,8 +248,8 @@ def test_create_and_delete_connection( ], ) def test_get_bearer_token( - airbyte_cloud_client_id, - airbyte_cloud_client_secret, + airbyte_cloud_client_id: SecretString, + airbyte_cloud_client_secret: SecretString, api_root: str, ) -> None: try: