Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 104 additions & 67 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,34 @@ def list_connections(
client_secret=client_secret,
api_root=api_root,
)
response = airbyte_instance.connections.list_connections(
api.ListConnectionsRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.connections_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
result: list[models.ConnectionResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response = airbyte_instance.connections.list_connections(
api.ListConnectionsRequest(
workspace_ids=[workspace_id],
offset=offset,
limit=page_size,
),
)
assert response.connections_response is not None
return [
connection
for connection in response.connections_response.data
if name_filter(connection.name)
]
has_more = bool(response.connections_response and response.connections_response.next)
offset += page_size

if not status_ok(response.status_code) and response.connections_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.connections_response is not None
result += [
connection
for connection in response.connections_response.data
if name_filter(connection.name)
]
return result


def list_workspaces(
Expand All @@ -192,24 +201,32 @@ def list_workspaces(
client_secret=client_secret,
api_root=api_root,
)
result: list[models.WorkspaceResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
api.ListWorkspacesRequest(workspace_ids=[workspace_id], offset=offset, limit=page_size),
)
has_more = bool(response.workspaces_response and response.workspaces_response.next)
offset += page_size

response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
api.ListWorkspacesRequest(
workspace_ids=[workspace_id],
),
)
if not status_ok(response.status_code) and response.workspaces_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)

if not status_ok(response.status_code) and response.workspaces_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.workspaces_response is not None
return [
workspace for workspace in response.workspaces_response.data if name_filter(workspace.name)
]
assert response.workspaces_response is not None
result += [
workspace
for workspace in response.workspaces_response.data
if name_filter(workspace.name)
]

return result


def list_sources(
Expand All @@ -233,21 +250,31 @@ def list_sources(
client_secret=client_secret,
api_root=api_root,
)
response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
api.ListSourcesRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.sources_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
result: list[models.SourceResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
api.ListSourcesRequest(
workspace_ids=[workspace_id],
offset=offset,
limit=page_size,
),
)
assert response.sources_response is not None
return [source for source in response.sources_response.data if name_filter(source.name)]
has_more = bool(response.sources_response and response.sources_response.next)
offset += page_size

if not status_ok(response.status_code) and response.sources_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.sources_response is not None
result += [source for source in response.sources_response.data if name_filter(source.name)]

return result


def list_destinations(
Expand All @@ -271,25 +298,35 @@ def list_destinations(
client_secret=client_secret,
api_root=api_root,
)
response = airbyte_instance.destinations.list_destinations(
api.ListDestinationsRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.destinations_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
result: list[models.DestinationResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response = airbyte_instance.destinations.list_destinations(
api.ListDestinationsRequest(
workspace_ids=[workspace_id],
offset=offset,
limit=page_size,
),
)
assert response.destinations_response is not None
return [
destination
for destination in response.destinations_response.data
if name_filter(destination.name)
]
has_more = bool(response.destinations_response and response.destinations_response.next)
offset += page_size

if not status_ok(response.status_code) and response.destinations_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.destinations_response is not None
result += [
destination
for destination in response.destinations_response.data
if name_filter(destination.name)
]

return result


# Get and run connections
Expand Down Expand Up @@ -369,7 +406,7 @@ def run_connection(
def get_job_logs(
workspace_id: str,
connection_id: str,
limit: int = 20,
limit: int = 100,
*,
api_root: str,
client_id: SecretString,
Expand Down
27 changes: 27 additions & 0 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,35 @@ def _fetch_connection_info(self) -> ConnectionResponse:
client_secret=self.workspace.client_secret,
)

@classmethod
def _from_connection_response(
cls,
workspace: CloudWorkspace,
connection_response: ConnectionResponse,
) -> CloudConnection:
"""Create a CloudConnection from a ConnectionResponse."""
result = cls(
workspace=workspace,
connection_id=connection_response.connection_id,
source=connection_response.source_id,
destination=connection_response.destination_id,
)
result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API
return result

# Properties

@property
def name(self) -> str | None:
"""Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.
"""
if not self._connection_info:
self._connection_info = self._fetch_connection_info()

return self._connection_info.name

@property
def source_id(self) -> str:
"""The ID of the source."""
Expand Down
75 changes: 75 additions & 0 deletions airbyte/cloud/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, ClassVar, Literal

from airbyte_api import models as api_models # noqa: TC002

from airbyte._util import api_util


Expand Down Expand Up @@ -99,6 +101,27 @@ def __init__(
self.connector_id = connector_id
"""The ID of the connector."""

self._connector_info: api_models.SourceResponse | api_models.DestinationResponse | None = (
None
)
"""The connection info object. (Cached.)"""

@property
def name(self) -> str | None:
"""Get the display name of the connector, if available.

E.g. "My Postgres Source", not the canonical connector name ("source-postgres").
"""
if not self._connector_info:
self._connector_info = self._fetch_connector_info()

return self._connector_info.name

@abc.abstractmethod
def _fetch_connector_info(self) -> api_models.SourceResponse | api_models.DestinationResponse:
"""Populate the connector with data from the API."""
...

@property
def connector_url(self) -> str:
"""Get the web URL of the source connector."""
Expand Down Expand Up @@ -164,6 +187,32 @@ def source_id(self) -> str:
"""
return self.connector_id

def _fetch_connector_info(self) -> api_models.SourceResponse:
"""Populate the source with data from the API."""
return api_util.get_source(
source_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)

@classmethod
def _from_source_response(
cls,
workspace: CloudWorkspace,
source_response: api_models.SourceResponse,
) -> CloudSource:
"""Internal factory method.

Creates a CloudSource object from a REST API SourceResponse object.
"""
result = cls(
workspace=workspace,
connector_id=source_response.source_id,
)
result._connector_info = source_response # noqa: SLF001 # Accessing Non-Public API
return result


class CloudDestination(CloudConnector):
"""A cloud destination is a destination that is deployed on Airbyte Cloud."""
Expand All @@ -178,3 +227,29 @@ def destination_id(self) -> str:
This is an alias for `connector_id`.
"""
return self.connector_id

def _fetch_connector_info(self) -> api_models.DestinationResponse:
"""Populate the destination with data from the API."""
return api_util.get_destination(
destination_id=self.connector_id,
api_root=self.workspace.api_root,
client_id=self.workspace.client_id,
client_secret=self.workspace.client_secret,
)

@classmethod
def _from_destination_response(
cls,
workspace: CloudWorkspace,
destination_response: api_models.DestinationResponse,
) -> CloudDestination:
"""Internal factory method.

Creates a CloudDestination object from a REST API DestinationResponse object.
"""
result = cls(
workspace=workspace,
connector_id=destination_response.destination_id,
)
result._connector_info = destination_response # noqa: SLF001 # Accessing Non-Public API
return result
14 changes: 6 additions & 8 deletions airbyte/cloud/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,9 @@ def list_connections(
client_secret=self.client_secret,
)
return [
CloudConnection(
CloudConnection._from_connection_response( # noqa: SLF001 (non-public API)
workspace=self,
connection_id=connection.connection_id,
source=None,
destination=None,
connection_response=connection,
)
for connection in connections
if name is None or connection.name == name
Expand All @@ -418,9 +416,9 @@ def list_sources(
client_secret=self.client_secret,
)
return [
CloudSource(
CloudSource._from_source_response( # noqa: SLF001 (non-public API)
workspace=self,
connector_id=source.source_id,
source_response=source,
)
for source in sources
if name is None or source.name == name
Expand All @@ -445,9 +443,9 @@ def list_destinations(
client_secret=self.client_secret,
)
return [
CloudDestination(
CloudDestination._from_destination_response( # noqa: SLF001 (non-public API)
workspace=self,
connector_id=destination.destination_id,
destination_response=destination,
)
for destination in destinations
if name is None or destination.name == name
Expand Down
Loading
Loading