diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index ee950bb2..32b982d1 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -150,25 +150,34 @@ def list_connections( client_secret=client_secret, api_root=api_root, ) - response = airbyte_instance.connections.list_connections( - api.ListConnectionsRequest( - workspace_ids=[workspace_id], - ), - ) - - if not status_ok(response.status_code) and response.connections_response: - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } + result: list[models.ConnectionResponse] = [] + has_more = True + offset, page_size = 0, 100 + while has_more: + response = airbyte_instance.connections.list_connections( + api.ListConnectionsRequest( + workspace_ids=[workspace_id], + offset=offset, + limit=page_size, + ), ) - assert response.connections_response is not None - return [ - connection - for connection in response.connections_response.data - if name_filter(connection.name) - ] + has_more = bool(response.connections_response and response.connections_response.next) + offset += page_size + + if not status_ok(response.status_code) and response.connections_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + assert response.connections_response is not None + result += [ + connection + for connection in response.connections_response.data + if name_filter(connection.name) + ] + return result def list_workspaces( @@ -192,24 +201,32 @@ def list_workspaces( client_secret=client_secret, api_root=api_root, ) + result: list[models.WorkspaceResponse] = [] + has_more = True + offset, page_size = 0, 100 + while has_more: + response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces( + api.ListWorkspacesRequest(workspace_ids=[workspace_id], offset=offset, limit=page_size), + ) + has_more = bool(response.workspaces_response and response.workspaces_response.next) + offset += page_size - response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces( - api.ListWorkspacesRequest( - workspace_ids=[workspace_id], - ), - ) + if not status_ok(response.status_code) and response.workspaces_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) - if not status_ok(response.status_code) and response.workspaces_response: - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } - ) - assert response.workspaces_response is not None - return [ - workspace for workspace in response.workspaces_response.data if name_filter(workspace.name) - ] + assert response.workspaces_response is not None + result += [ + workspace + for workspace in response.workspaces_response.data + if name_filter(workspace.name) + ] + + return result def list_sources( @@ -233,21 +250,31 @@ def list_sources( client_secret=client_secret, api_root=api_root, ) - response: api.ListSourcesResponse = airbyte_instance.sources.list_sources( - api.ListSourcesRequest( - workspace_ids=[workspace_id], - ), - ) - - if not status_ok(response.status_code) and response.sources_response: - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } + result: list[models.SourceResponse] = [] + has_more = True + offset, page_size = 0, 100 + while has_more: + response: api.ListSourcesResponse = airbyte_instance.sources.list_sources( + api.ListSourcesRequest( + workspace_ids=[workspace_id], + offset=offset, + limit=page_size, + ), ) - assert response.sources_response is not None - return [source for source in response.sources_response.data if name_filter(source.name)] + has_more = bool(response.sources_response and response.sources_response.next) + offset += page_size + + if not status_ok(response.status_code) and response.sources_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + assert response.sources_response is not None + result += [source for source in response.sources_response.data if name_filter(source.name)] + + return result def list_destinations( @@ -271,25 +298,35 @@ def list_destinations( client_secret=client_secret, api_root=api_root, ) - response = airbyte_instance.destinations.list_destinations( - api.ListDestinationsRequest( - workspace_ids=[workspace_id], - ), - ) - - if not status_ok(response.status_code) and response.destinations_response: - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } + result: list[models.DestinationResponse] = [] + has_more = True + offset, page_size = 0, 100 + while has_more: + response = airbyte_instance.destinations.list_destinations( + api.ListDestinationsRequest( + workspace_ids=[workspace_id], + offset=offset, + limit=page_size, + ), ) - assert response.destinations_response is not None - return [ - destination - for destination in response.destinations_response.data - if name_filter(destination.name) - ] + has_more = bool(response.destinations_response and response.destinations_response.next) + offset += page_size + + if not status_ok(response.status_code) and response.destinations_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + assert response.destinations_response is not None + result += [ + destination + for destination in response.destinations_response.data + if name_filter(destination.name) + ] + + return result # Get and run connections @@ -369,7 +406,7 @@ def run_connection( def get_job_logs( workspace_id: str, connection_id: str, - limit: int = 20, + limit: int = 100, *, api_root: str, client_id: SecretString, diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index 8ea570ac..24557afb 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -64,8 +64,35 @@ def _fetch_connection_info(self) -> ConnectionResponse: client_secret=self.workspace.client_secret, ) + @classmethod + def _from_connection_response( + cls, + workspace: CloudWorkspace, + connection_response: ConnectionResponse, + ) -> CloudConnection: + """Create a CloudConnection from a ConnectionResponse.""" + result = cls( + workspace=workspace, + connection_id=connection_response.connection_id, + source=connection_response.source_id, + destination=connection_response.destination_id, + ) + result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API + return result + # Properties + @property + def name(self) -> str | None: + """Get the display name of the connection, if available. + + E.g. "My Postgres to Snowflake", not the connection ID. + """ + if not self._connection_info: + self._connection_info = self._fetch_connection_info() + + return self._connection_info.name + @property def source_id(self) -> str: """The ID of the source.""" diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index 6e7cc129..658ce7a6 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -43,6 +43,8 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, ClassVar, Literal +from airbyte_api import models as api_models # noqa: TC002 + from airbyte._util import api_util @@ -99,6 +101,27 @@ def __init__( self.connector_id = connector_id """The ID of the connector.""" + self._connector_info: api_models.SourceResponse | api_models.DestinationResponse | None = ( + None + ) + """The connection info object. (Cached.)""" + + @property + def name(self) -> str | None: + """Get the display name of the connector, if available. + + E.g. "My Postgres Source", not the canonical connector name ("source-postgres"). + """ + if not self._connector_info: + self._connector_info = self._fetch_connector_info() + + return self._connector_info.name + + @abc.abstractmethod + def _fetch_connector_info(self) -> api_models.SourceResponse | api_models.DestinationResponse: + """Populate the connector with data from the API.""" + ... + @property def connector_url(self) -> str: """Get the web URL of the source connector.""" @@ -164,6 +187,32 @@ def source_id(self) -> str: """ return self.connector_id + def _fetch_connector_info(self) -> api_models.SourceResponse: + """Populate the source with data from the API.""" + return api_util.get_source( + source_id=self.connector_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + @classmethod + def _from_source_response( + cls, + workspace: CloudWorkspace, + source_response: api_models.SourceResponse, + ) -> CloudSource: + """Internal factory method. + + Creates a CloudSource object from a REST API SourceResponse object. + """ + result = cls( + workspace=workspace, + connector_id=source_response.source_id, + ) + result._connector_info = source_response # noqa: SLF001 # Accessing Non-Public API + return result + class CloudDestination(CloudConnector): """A cloud destination is a destination that is deployed on Airbyte Cloud.""" @@ -178,3 +227,29 @@ def destination_id(self) -> str: This is an alias for `connector_id`. """ return self.connector_id + + def _fetch_connector_info(self) -> api_models.DestinationResponse: + """Populate the destination with data from the API.""" + return api_util.get_destination( + destination_id=self.connector_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + @classmethod + def _from_destination_response( + cls, + workspace: CloudWorkspace, + destination_response: api_models.DestinationResponse, + ) -> CloudDestination: + """Internal factory method. + + Creates a CloudDestination object from a REST API DestinationResponse object. + """ + result = cls( + workspace=workspace, + connector_id=destination_response.destination_id, + ) + result._connector_info = destination_response # noqa: SLF001 # Accessing Non-Public API + return result diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index f3550c6e..621b3d63 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -389,11 +389,9 @@ def list_connections( client_secret=self.client_secret, ) return [ - CloudConnection( + CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) workspace=self, - connection_id=connection.connection_id, - source=None, - destination=None, + connection_response=connection, ) for connection in connections if name is None or connection.name == name @@ -418,9 +416,9 @@ def list_sources( client_secret=self.client_secret, ) return [ - CloudSource( + CloudSource._from_source_response( # noqa: SLF001 (non-public API) workspace=self, - connector_id=source.source_id, + source_response=source, ) for source in sources if name is None or source.name == name @@ -445,9 +443,9 @@ def list_destinations( client_secret=self.client_secret, ) return [ - CloudDestination( + CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) workspace=self, - connector_id=destination.destination_id, + destination_response=destination, ) for destination in destinations if name is None or destination.name == name diff --git a/bin/cleanup_cloud_artifacts.py b/bin/cleanup_cloud_artifacts.py new file mode 100755 index 00000000..8378a927 --- /dev/null +++ b/bin/cleanup_cloud_artifacts.py @@ -0,0 +1,229 @@ +#!/usr/bin/env python3 +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Helper script to clean up stale connector and connection definitions from cloud tests. + +This script scans the source code for hardcoded UUIDs/GUIDs and exempts them from cleanup. +It uses the existing PyAirbyte cloud operations to list and delete resources. + +Usage: + poetry run python bin/cleanup_cloud_artifacts.py --dry-run # Default, lists what would be deleted + poetry run python bin/cleanup_cloud_artifacts.py --no-dry-run # Actually performs deletions + poetry run python bin/cleanup_cloud_artifacts.py --help # Show help + +Examples: + poetry run python bin/cleanup_cloud_artifacts.py + + poetry run python bin/cleanup_cloud_artifacts.py --no-dry-run +""" + +import re +import sys +from pathlib import Path + +import click + +from airbyte.cloud.auth import ( + resolve_cloud_api_url, + resolve_cloud_client_id, + resolve_cloud_client_secret, + resolve_cloud_workspace_id, +) +from airbyte.cloud.workspaces import CloudWorkspace + + +UUID_PATTERN = re.compile( + r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}", re.IGNORECASE +) + + +def scan_source_code_for_uuids(repo_path: Path) -> set[str]: + """Scan the source code for hardcoded UUIDs/GUIDs to exempt from cleanup.""" + hardcoded_uuids = set() + + file_patterns = ["**/*.py", "**/*.yaml", "**/*.yml", "**/*.json", "**/*.md"] + + for pattern in file_patterns: + for file_path in repo_path.glob(pattern): + if any(part.startswith(".") for part in file_path.parts): + continue + + try: + content = file_path.read_text(encoding="utf-8") + matches = UUID_PATTERN.findall(content) + hardcoded_uuids.update(matches) + except (UnicodeDecodeError, PermissionError): + continue + + return hardcoded_uuids + + +def get_cloud_workspace() -> CloudWorkspace: + """Get an authenticated CloudWorkspace using environment variables.""" + return CloudWorkspace( + workspace_id=resolve_cloud_workspace_id(), + client_id=resolve_cloud_client_id(), + client_secret=resolve_cloud_client_secret(), + api_root=resolve_cloud_api_url(), + ) + + +def scan_connections( + workspace: CloudWorkspace, hardcoded_uuids: set[str], *, is_dry_run: bool +) -> list: + """Scan and optionally delete connections.""" + connections_to_delete = [] + + click.echo("\nšŸ”— Scanning connections...") + try: + connections = workspace.list_connections() + for connection in connections: + if connection.connection_id not in hardcoded_uuids: + connections_to_delete.append(connection) + click.echo( + f" Found connection: {connection.connection_id} - {connection.connection_url}: {connection.name}" + ) + except Exception as e: + click.echo(f"āš ļø Failed to list connections: {e}", err=True) + + return connections_to_delete + + +def scan_sources(workspace: CloudWorkspace, hardcoded_uuids: set[str], *, is_dry_run: bool) -> list: + """Scan and optionally delete sources.""" + sources_to_delete = [] + + click.echo("\nšŸ“„ Scanning sources...") + try: + sources = workspace.list_sources() + for source in sources: + if source.connector_id not in hardcoded_uuids: + sources_to_delete.append(source) + click.echo( + f" Found source: {source.connector_id} - {source.connector_url}: {source.name}" + ) + except Exception as e: + click.echo(f"āš ļø Failed to list sources: {e}", err=True) + + return sources_to_delete + + +def scan_destinations( + workspace: CloudWorkspace, hardcoded_uuids: set[str], *, is_dry_run: bool +) -> list: + """Scan and optionally delete destinations.""" + destinations_to_delete = [] + + click.echo("\nšŸ“¤ Scanning destinations...") + try: + destinations = workspace.list_destinations() + for destination in destinations: + if destination.connector_id not in hardcoded_uuids: + destinations_to_delete.append(destination) + click.echo( + f" Found destination: {destination.connector_id} - {destination.connector_url}: {destination.name}" + ) + except Exception as e: + click.echo(f"āš ļø Failed to list destinations: {e}", err=True) + + return destinations_to_delete + + +def perform_deletions(connections: list, sources: list, destinations: list) -> int: + """Perform actual deletions and return count of successful deletions.""" + return False + deleted_count = 0 + + for connection in connections: + try: + connection.permanently_delete() + click.echo(f" āœ… Deleted connection: {connection.connection_id}") + deleted_count += 1 + except Exception as e: + click.echo( + f" āŒ Failed to delete connection {connection.connection_id}: {e}", err=True + ) + + for source in sources: + try: + source.permanently_delete() + click.echo(f" āœ… Deleted source: {source.connector_id}") + deleted_count += 1 + except Exception as e: + click.echo(f" āŒ Failed to delete source {source.connector_id}: {e}", err=True) + + for destination in destinations: + try: + destination.permanently_delete() + click.echo(f" āœ… Deleted destination: {destination.connector_id}") + deleted_count += 1 + except Exception as e: + click.echo( + f" āŒ Failed to delete destination {destination.connector_id}: {e}", err=True + ) + + return deleted_count + + +@click.command() +@click.option( + "--dry-run/--no-dry-run", + default=True, + help="If true (default), only list what would be deleted. If false, actually delete resources.", +) +@click.option( + "--repo-path", + type=click.Path(exists=True, file_okay=False, dir_okay=True, path_type=Path), + default=Path.cwd(), + help="Path to the repository to scan for hardcoded UUIDs. Defaults to current directory.", +) +def main(*, dry_run: bool, repo_path: Path) -> None: + """Clean up stale connector and connection definitions from cloud integration tests.""" + click.echo(f"šŸ” Scanning source code for hardcoded UUIDs in: {repo_path}") + + hardcoded_uuids = scan_source_code_for_uuids(repo_path / "tests") + click.echo(f"šŸ“‹ Found {len(hardcoded_uuids)} hardcoded UUIDs to exempt from cleanup") + + if hardcoded_uuids: + click.echo("šŸ”’ Exempted UUIDs:") + for uuid in sorted(hardcoded_uuids): + click.echo(f" - {uuid}") + + try: + workspace = get_cloud_workspace() + workspace.connect() + click.echo(f"āœ… Connected to workspace: {workspace.workspace_url}") + except Exception as e: + click.echo(f"āŒ Failed to connect to cloud workspace: {e}", err=True) + sys.exit(1) + + connections_to_delete = scan_connections(workspace, hardcoded_uuids, is_dry_run=dry_run) + sources_to_delete = scan_sources(workspace, hardcoded_uuids, is_dry_run=dry_run) + destinations_to_delete = scan_destinations(workspace, hardcoded_uuids, is_dry_run=dry_run) + + total_items = len(connections_to_delete) + len(sources_to_delete) + len(destinations_to_delete) + + if dry_run: + click.echo("\nšŸ“Š Summary (DRY RUN):") + click.echo(f" - {len(connections_to_delete)} connections would be deleted") + click.echo(f" - {len(sources_to_delete)} sources would be deleted") + click.echo(f" - {len(destinations_to_delete)} destinations would be deleted") + click.echo(f" - {total_items} total items would be deleted") + + if total_items > 0: + click.echo("\nšŸ’” To actually delete these items, run with --no-dry-run") + else: + click.echo("\n✨ No stale artifacts found to clean up!") + else: + click.echo("\nšŸ—‘ļø Performing deletions...") + + deleted_count = perform_deletions( + connections_to_delete, sources_to_delete, destinations_to_delete + ) + + click.echo( + f"\nšŸ“Š Cleanup completed: {deleted_count}/{total_items} items successfully deleted" + ) + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index d5e3d282..e120e478 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -205,6 +205,7 @@ mcp-serve-http = { cmd = "poetry run python -c \"from airbyte.mcp.server import mcp-serve-sse = { cmd = "poetry run python -c \"from airbyte.mcp.server import app; app.run(transport='sse', host='127.0.0.1', port=8000)\"", help = "Start the MCP server with SSE transport" } mcp-inspect = { cmd = "poetry run fastmcp inspect airbyte/mcp/server.py:app", help = "Inspect MCP tools and resources (supports --tools, --health, etc.)" } mcp-tool-test = { cmd = "poetry run python bin/test_mcp_tool.py", help = "Test MCP tools directly with JSON arguments: poe mcp-tool-test ''" } +cloud-cleanup = { cmd = "poetry run python bin/cleanup_cloud_artifacts.py", help = "Clean up stale cloud artifacts from integration tests: poe cloud-cleanup [--dry-run=false]" } [tool.airbyte_ci] extra_poetry_groups = ["dev"]