diff --git a/airbyte/_util/api_util.py b/airbyte/_util/api_util.py index ee950bb2..32b982d1 100644 --- a/airbyte/_util/api_util.py +++ b/airbyte/_util/api_util.py @@ -150,25 +150,34 @@ def list_connections( client_secret=client_secret, api_root=api_root, ) - response = airbyte_instance.connections.list_connections( - api.ListConnectionsRequest( - workspace_ids=[workspace_id], - ), - ) - - if not status_ok(response.status_code) and response.connections_response: - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } + result: list[models.ConnectionResponse] = [] + has_more = True + offset, page_size = 0, 100 + while has_more: + response = airbyte_instance.connections.list_connections( + api.ListConnectionsRequest( + workspace_ids=[workspace_id], + offset=offset, + limit=page_size, + ), ) - assert response.connections_response is not None - return [ - connection - for connection in response.connections_response.data - if name_filter(connection.name) - ] + has_more = bool(response.connections_response and response.connections_response.next) + offset += page_size + + if not status_ok(response.status_code) and response.connections_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + assert response.connections_response is not None + result += [ + connection + for connection in response.connections_response.data + if name_filter(connection.name) + ] + return result def list_workspaces( @@ -192,24 +201,32 @@ def list_workspaces( client_secret=client_secret, api_root=api_root, ) + result: list[models.WorkspaceResponse] = [] + has_more = True + offset, page_size = 0, 100 + while has_more: + response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces( + api.ListWorkspacesRequest(workspace_ids=[workspace_id], offset=offset, limit=page_size), + ) + has_more = bool(response.workspaces_response and response.workspaces_response.next) + offset += page_size - response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces( - api.ListWorkspacesRequest( - workspace_ids=[workspace_id], - ), - ) + if not status_ok(response.status_code) and response.workspaces_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) - if not status_ok(response.status_code) and response.workspaces_response: - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } - ) - assert response.workspaces_response is not None - return [ - workspace for workspace in response.workspaces_response.data if name_filter(workspace.name) - ] + assert response.workspaces_response is not None + result += [ + workspace + for workspace in response.workspaces_response.data + if name_filter(workspace.name) + ] + + return result def list_sources( @@ -233,21 +250,31 @@ def list_sources( client_secret=client_secret, api_root=api_root, ) - response: api.ListSourcesResponse = airbyte_instance.sources.list_sources( - api.ListSourcesRequest( - workspace_ids=[workspace_id], - ), - ) - - if not status_ok(response.status_code) and response.sources_response: - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } + result: list[models.SourceResponse] = [] + has_more = True + offset, page_size = 0, 100 + while has_more: + response: api.ListSourcesResponse = airbyte_instance.sources.list_sources( + api.ListSourcesRequest( + workspace_ids=[workspace_id], + offset=offset, + limit=page_size, + ), ) - assert response.sources_response is not None - return [source for source in response.sources_response.data if name_filter(source.name)] + has_more = bool(response.sources_response and response.sources_response.next) + offset += page_size + + if not status_ok(response.status_code) and response.sources_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + assert response.sources_response is not None + result += [source for source in response.sources_response.data if name_filter(source.name)] + + return result def list_destinations( @@ -271,25 +298,35 @@ def list_destinations( client_secret=client_secret, api_root=api_root, ) - response = airbyte_instance.destinations.list_destinations( - api.ListDestinationsRequest( - workspace_ids=[workspace_id], - ), - ) - - if not status_ok(response.status_code) and response.destinations_response: - raise AirbyteError( - context={ - "workspace_id": workspace_id, - "response": response, - } + result: list[models.DestinationResponse] = [] + has_more = True + offset, page_size = 0, 100 + while has_more: + response = airbyte_instance.destinations.list_destinations( + api.ListDestinationsRequest( + workspace_ids=[workspace_id], + offset=offset, + limit=page_size, + ), ) - assert response.destinations_response is not None - return [ - destination - for destination in response.destinations_response.data - if name_filter(destination.name) - ] + has_more = bool(response.destinations_response and response.destinations_response.next) + offset += page_size + + if not status_ok(response.status_code) and response.destinations_response: + raise AirbyteError( + context={ + "workspace_id": workspace_id, + "response": response, + } + ) + assert response.destinations_response is not None + result += [ + destination + for destination in response.destinations_response.data + if name_filter(destination.name) + ] + + return result # Get and run connections @@ -369,7 +406,7 @@ def run_connection( def get_job_logs( workspace_id: str, connection_id: str, - limit: int = 20, + limit: int = 100, *, api_root: str, client_id: SecretString, diff --git a/airbyte/cloud/connections.py b/airbyte/cloud/connections.py index 8ea570ac..24557afb 100644 --- a/airbyte/cloud/connections.py +++ b/airbyte/cloud/connections.py @@ -64,8 +64,35 @@ def _fetch_connection_info(self) -> ConnectionResponse: client_secret=self.workspace.client_secret, ) + @classmethod + def _from_connection_response( + cls, + workspace: CloudWorkspace, + connection_response: ConnectionResponse, + ) -> CloudConnection: + """Create a CloudConnection from a ConnectionResponse.""" + result = cls( + workspace=workspace, + connection_id=connection_response.connection_id, + source=connection_response.source_id, + destination=connection_response.destination_id, + ) + result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API + return result + # Properties + @property + def name(self) -> str | None: + """Get the display name of the connection, if available. + + E.g. "My Postgres to Snowflake", not the connection ID. + """ + if not self._connection_info: + self._connection_info = self._fetch_connection_info() + + return self._connection_info.name + @property def source_id(self) -> str: """The ID of the source.""" diff --git a/airbyte/cloud/connectors.py b/airbyte/cloud/connectors.py index 6e7cc129..658ce7a6 100644 --- a/airbyte/cloud/connectors.py +++ b/airbyte/cloud/connectors.py @@ -43,6 +43,8 @@ from dataclasses import dataclass from typing import TYPE_CHECKING, ClassVar, Literal +from airbyte_api import models as api_models # noqa: TC002 + from airbyte._util import api_util @@ -99,6 +101,27 @@ def __init__( self.connector_id = connector_id """The ID of the connector.""" + self._connector_info: api_models.SourceResponse | api_models.DestinationResponse | None = ( + None + ) + """The connection info object. (Cached.)""" + + @property + def name(self) -> str | None: + """Get the display name of the connector, if available. + + E.g. "My Postgres Source", not the canonical connector name ("source-postgres"). + """ + if not self._connector_info: + self._connector_info = self._fetch_connector_info() + + return self._connector_info.name + + @abc.abstractmethod + def _fetch_connector_info(self) -> api_models.SourceResponse | api_models.DestinationResponse: + """Populate the connector with data from the API.""" + ... + @property def connector_url(self) -> str: """Get the web URL of the source connector.""" @@ -164,6 +187,32 @@ def source_id(self) -> str: """ return self.connector_id + def _fetch_connector_info(self) -> api_models.SourceResponse: + """Populate the source with data from the API.""" + return api_util.get_source( + source_id=self.connector_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + @classmethod + def _from_source_response( + cls, + workspace: CloudWorkspace, + source_response: api_models.SourceResponse, + ) -> CloudSource: + """Internal factory method. + + Creates a CloudSource object from a REST API SourceResponse object. + """ + result = cls( + workspace=workspace, + connector_id=source_response.source_id, + ) + result._connector_info = source_response # noqa: SLF001 # Accessing Non-Public API + return result + class CloudDestination(CloudConnector): """A cloud destination is a destination that is deployed on Airbyte Cloud.""" @@ -178,3 +227,29 @@ def destination_id(self) -> str: This is an alias for `connector_id`. """ return self.connector_id + + def _fetch_connector_info(self) -> api_models.DestinationResponse: + """Populate the destination with data from the API.""" + return api_util.get_destination( + destination_id=self.connector_id, + api_root=self.workspace.api_root, + client_id=self.workspace.client_id, + client_secret=self.workspace.client_secret, + ) + + @classmethod + def _from_destination_response( + cls, + workspace: CloudWorkspace, + destination_response: api_models.DestinationResponse, + ) -> CloudDestination: + """Internal factory method. + + Creates a CloudDestination object from a REST API DestinationResponse object. + """ + result = cls( + workspace=workspace, + connector_id=destination_response.destination_id, + ) + result._connector_info = destination_response # noqa: SLF001 # Accessing Non-Public API + return result diff --git a/airbyte/cloud/workspaces.py b/airbyte/cloud/workspaces.py index f3550c6e..621b3d63 100644 --- a/airbyte/cloud/workspaces.py +++ b/airbyte/cloud/workspaces.py @@ -389,11 +389,9 @@ def list_connections( client_secret=self.client_secret, ) return [ - CloudConnection( + CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) workspace=self, - connection_id=connection.connection_id, - source=None, - destination=None, + connection_response=connection, ) for connection in connections if name is None or connection.name == name @@ -418,9 +416,9 @@ def list_sources( client_secret=self.client_secret, ) return [ - CloudSource( + CloudSource._from_source_response( # noqa: SLF001 (non-public API) workspace=self, - connector_id=source.source_id, + source_response=source, ) for source in sources if name is None or source.name == name @@ -445,9 +443,9 @@ def list_destinations( client_secret=self.client_secret, ) return [ - CloudDestination( + CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) workspace=self, - connector_id=destination.destination_id, + destination_response=destination, ) for destination in destinations if name is None or destination.name == name diff --git a/tests/integration_tests/cloud/test_cloud_workspaces.py b/tests/integration_tests/cloud/test_cloud_workspaces.py index 0a473f5f..e9900b9c 100644 --- a/tests/integration_tests/cloud/test_cloud_workspaces.py +++ b/tests/integration_tests/cloud/test_cloud_workspaces.py @@ -38,8 +38,9 @@ def test_deploy_source( ) source.check() cloud_source: CloudSource = cloud_workspace.deploy_source( - name="test-source", + name="test-faker-source-deleteme", source=source, + unique=False, ) cloud_workspace.permanently_delete_source(cloud_source) @@ -52,8 +53,9 @@ def test_deploy_dummy_source( deployable_dummy_source.check() cloud_source: CloudSource = cloud_workspace.deploy_source( - name="test-source", + name="test-source-deleteme", source=deployable_dummy_source, + unique=False, ) cloud_workspace.permanently_delete_source(cloud_source)