Skip to content
Merged
171 changes: 104 additions & 67 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,34 @@ def list_connections(
client_secret=client_secret,
api_root=api_root,
)
response = airbyte_instance.connections.list_connections(
api.ListConnectionsRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.connections_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
result: list[models.ConnectionResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response = airbyte_instance.connections.list_connections(
api.ListConnectionsRequest(
workspace_ids=[workspace_id],
offset=offset,
limit=page_size,
),
)
assert response.connections_response is not None
return [
connection
for connection in response.connections_response.data
if name_filter(connection.name)
]
has_more = bool(response.connections_response and response.connections_response.next)
offset += page_size

if not status_ok(response.status_code) and response.connections_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.connections_response is not None
result += [
connection
for connection in response.connections_response.data
if name_filter(connection.name)
]
return result


def list_workspaces(
Expand All @@ -192,24 +201,32 @@ def list_workspaces(
client_secret=client_secret,
api_root=api_root,
)
result: list[models.WorkspaceResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
api.ListWorkspacesRequest(workspace_ids=[workspace_id], offset=offset, limit=page_size),
)
has_more = bool(response.workspaces_response and response.workspaces_response.next)
offset += page_size

response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
api.ListWorkspacesRequest(
workspace_ids=[workspace_id],
),
)
if not status_ok(response.status_code) and response.workspaces_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)

if not status_ok(response.status_code) and response.workspaces_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.workspaces_response is not None
return [
workspace for workspace in response.workspaces_response.data if name_filter(workspace.name)
]
assert response.workspaces_response is not None
result += [
workspace
for workspace in response.workspaces_response.data
if name_filter(workspace.name)
]

return result


def list_sources(
Expand All @@ -233,21 +250,31 @@ def list_sources(
client_secret=client_secret,
api_root=api_root,
)
response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
api.ListSourcesRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.sources_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
result: list[models.SourceResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
api.ListSourcesRequest(
workspace_ids=[workspace_id],
offset=offset,
limit=page_size,
),
)
assert response.sources_response is not None
return [source for source in response.sources_response.data if name_filter(source.name)]
has_more = bool(response.sources_response and response.sources_response.next)
offset += page_size

if not status_ok(response.status_code) and response.sources_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.sources_response is not None
result += [source for source in response.sources_response.data if name_filter(source.name)]

return result


def list_destinations(
Expand All @@ -271,25 +298,35 @@ def list_destinations(
client_secret=client_secret,
api_root=api_root,
)
response = airbyte_instance.destinations.list_destinations(
api.ListDestinationsRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.destinations_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
result: list[models.DestinationResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response = airbyte_instance.destinations.list_destinations(
api.ListDestinationsRequest(
workspace_ids=[workspace_id],
offset=offset,
limit=page_size,
),
)
assert response.destinations_response is not None
return [
destination
for destination in response.destinations_response.data
if name_filter(destination.name)
]
has_more = bool(response.destinations_response and response.destinations_response.next)
offset += page_size

if not status_ok(response.status_code) and response.destinations_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.destinations_response is not None
result += [
destination
for destination in response.destinations_response.data
if name_filter(destination.name)
]

return result


# Get and run connections
Expand Down Expand Up @@ -369,7 +406,7 @@ def run_connection(
def get_job_logs(
workspace_id: str,
connection_id: str,
limit: int = 20,
limit: int = 100,
*,
api_root: str,
client_id: SecretString,
Expand Down
27 changes: 27 additions & 0 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,35 @@ def _fetch_connection_info(self) -> ConnectionResponse:
client_secret=self.workspace.client_secret,
)

@classmethod
def _from_connection_response(
cls,
workspace: CloudWorkspace,
connection_response: ConnectionResponse,
) -> CloudConnection:
"""Create a CloudConnection from a ConnectionResponse."""
result = cls(
workspace=workspace,
connection_id=connection_response.connection_id,
source=connection_response.source_id,
destination=connection_response.destination_id,
)
result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API
return result

# Properties

@property
def name(self) -> str | None:
"""Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.
"""
if not self._connection_info:
self._connection_info = self._fetch_connection_info()

return self._connection_info.name

@property
def source_id(self) -> str:
"""The ID of the source."""
Expand Down
75 changes: 75 additions & 0 deletions airbyte/cloud/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, ClassVar, Literal

from airbyte_api import models as api_models # noqa: TC002

from airbyte._util import api_util


Expand Down Expand Up @@ -99,6 +101,27 @@ def __init__(
self.connector_id = connector_id
"""The ID of the connector."""

self._connector_info: api_models.SourceResponse | api_models.DestinationResponse | None = (
None
)
"""The connection info object. (Cached.)"""

@property
def name(self) -> str | None:
"""Get the display name of the connector, if available.

E.g. "My Postgres Source", not the canonical connector name ("source-postgres").
"""
if not self._connector_info:
self._connector_info = self._fetch_connector_info()

return self._connector_info.name

@abc.abstractmethod
def _fetch_connector_info(self) -> api_models.SourceResponse | api_models.DestinationResponse:
"""Populate the connector with data from the API."""
...

@property
def connector_url(self) -> str:
"""Get the web URL of the source connector."""
Expand Down Expand Up @@ -164,6 +187,32 @@ def source_id(self) -> str:
"""
return self.connector_id

def _fetch_connector_info(self) -> api_models.SourceResponse:
"""Populate the source with data from the API."""
return api_util.get_source(
source_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)

@classmethod
def _from_source_response(
cls,
workspace: CloudWorkspace,
source_response: api_models.SourceResponse,
) -> CloudSource:
"""Internal factory method.

Creates a CloudSource object from a REST API SourceResponse object.
"""
result = cls(
workspace=workspace,
connector_id=source_response.source_id,
)
result._connector_info = source_response # noqa: SLF001 # Accessing Non-Public API
return result


class CloudDestination(CloudConnector):
"""A cloud destination is a destination that is deployed on Airbyte Cloud."""
Expand All @@ -178,3 +227,29 @@ def destination_id(self) -> str:
This is an alias for `connector_id`.
"""
return self.connector_id

def _fetch_connector_info(self) -> api_models.DestinationResponse:
"""Populate the destination with data from the API."""
return api_util.get_destination(
destination_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)

@classmethod
def _from_destination_response(
cls,
workspace: CloudWorkspace,
destination_response: api_models.DestinationResponse,
) -> CloudDestination:
"""Internal factory method.

Creates a CloudDestination object from a REST API DestinationResponse object.
"""
result = cls(
workspace=workspace,
connector_id=destination_response.destination_id,
)
result._connector_info = destination_response # noqa: SLF001 # Accessing Non-Public API
return result
14 changes: 6 additions & 8 deletions airbyte/cloud/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,9 @@ def list_connections(
client_secret=self.client_secret,
)
return [
CloudConnection(
CloudConnection._from_connection_response( # noqa: SLF001 (non-public API)
workspace=self,
connection_id=connection.connection_id,
source=None,
destination=None,
connection_response=connection,
)
for connection in connections
if name is None or connection.name == name
Expand All @@ -418,9 +416,9 @@ def list_sources(
client_secret=self.client_secret,
)
return [
CloudSource(
CloudSource._from_source_response( # noqa: SLF001 (non-public API)
workspace=self,
connector_id=source.source_id,
source_response=source,
)
for source in sources
if name is None or source.name == name
Expand All @@ -445,9 +443,9 @@ def list_destinations(
client_secret=self.client_secret,
)
return [
CloudDestination(
CloudDestination._from_destination_response( # noqa: SLF001 (non-public API)
workspace=self,
connector_id=destination.destination_id,
destination_response=destination,
)
for destination in destinations
if name is None or destination.name == name
Expand Down
Loading
Loading