Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import requests
from airbyte_api import api, models

from airbyte.constants import CLOUD_API_ROOT, CLOUD_CONFIG_API_ROOT
from airbyte.exceptions import (
AirbyteConnectionSyncError,
AirbyteError,
Expand All @@ -44,20 +45,6 @@

JOB_WAIT_INTERVAL_SECS = 2.0
JOB_WAIT_TIMEOUT_SECS_DEFAULT = 60 * 60 # 1 hour
CLOUD_API_ROOT = "https://api.airbyte.com/v1"
"""The Airbyte Cloud API root URL.

This is the root URL for the Airbyte Cloud API. It is used to interact with the Airbyte Cloud API
and is the default API root for the `CloudWorkspace` class.
- https://reference.airbyte.com/reference/getting-started
"""
CLOUD_CONFIG_API_ROOT = "https://cloud.airbyte.com/api/v1"
"""Internal-Use API Root, aka Airbyte "Config API".

Documentation:
- https://docs.airbyte.com/api-documentation#configuration-api-deprecated
- https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml
"""


def status_ok(status_code: int) -> bool:
Expand All @@ -73,6 +60,19 @@ def get_config_api_root(api_root: str) -> str:
raise NotImplementedError("Configuration API root not implemented for this API root.")


def get_web_url_root(api_root: str) -> str:
"""Get the web URL root from the main API root.

# TODO: This does not return a valid URL for self-managed instances, due to not knowing the
# web URL root. Logged here:
# - https://github.com/airbytehq/PyAirbyte/issues/563
"""
if api_root == CLOUD_API_ROOT:
return "https://cloud.airbyte.com"

return api_root


def get_airbyte_server_instance(
*,
api_root: str,
Expand Down Expand Up @@ -522,6 +522,28 @@ def delete_source(
)


# Utility function


def _get_destination_type_str(
destination: DestinationConfiguration | dict[str, Any],
) -> str:
if isinstance(destination, dict):
destination_type = destination.get("destinationType")
else:
destination_type = getattr(destination, "DESTINATION_TYPE", None)

if not destination_type or not isinstance(destination_type, str):
raise PyAirbyteInputError(
message="Could not determine destination type from configuration.",
context={
"destination": destination,
},
)

return destination_type


# Create, get, and delete destinations


Expand All @@ -540,8 +562,14 @@ def create_destination(
client_secret=client_secret,
api_root=api_root,
)
definition_id_override: str | None = None
if _get_destination_type_str(config) == "dev-null":
# TODO: We have to hard-code the definition ID for dev-null destination. (important-comment)
# https://github.com/airbytehq/PyAirbyte/issues/743
definition_id_override = "a7bcc9d8-13b3-4e49-b80d-d020b90045e3"
response: api.CreateDestinationResponse = airbyte_instance.destinations.create_destination(
models.DestinationCreateRequest(
definition_id=definition_id_override,
name=name,
workspace_id=workspace_id,
configuration=config, # Speakeasy API wants a dataclass, not a dict
Expand Down
41 changes: 41 additions & 0 deletions airbyte/cloud/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Authentication-related constants and utilities for the Airbyte Cloud."""

from airbyte import constants
from airbyte.secrets import SecretString
from airbyte.secrets.util import get_secret, try_get_secret


def resolve_cloud_client_secret(
input_value: str | SecretString | None = None,
/,
) -> SecretString:
"""Get the Airbyte Cloud client secret from the environment."""
return get_secret(constants.CLOUD_CLIENT_SECRET_ENV_VAR, default=input_value)


def resolve_cloud_client_id(
input_value: str | SecretString | None = None,
/,
) -> SecretString:
"""Get the Airbyte Cloud client ID from the environment."""
return get_secret(constants.CLOUD_CLIENT_ID_ENV_VAR, default=input_value)


def resolve_cloud_api_url(
input_value: str | None = None,
/,
) -> str:
"""Get the Airbyte Cloud API URL from the environment, or return the default."""
return str(
try_get_secret(constants.CLOUD_API_ROOT_ENV_VAR, default=input_value)
or constants.CLOUD_API_ROOT
)


def resolve_cloud_workspace_id(
input_value: str | None = None,
/,
) -> str:
"""Get the Airbyte Cloud workspace ID from the environment, or return None if not set."""
return str(get_secret(constants.CLOUD_WORKSPACE_ID_ENV_VAR, default=input_value))
11 changes: 9 additions & 2 deletions airbyte/cloud/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,13 @@ def table_prefix(self) -> str:

@property
def connection_url(self) -> str | None:
"""The URL to the connection."""
"""The web URL to the connection."""
return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

@property
def job_history_url(self) -> str | None:
"""The URL to the job history for the connection."""
return f"{self.connection_url}/job-history"
return f"{self.connection_url}/timeline"

# Run Sync

Expand Down Expand Up @@ -169,6 +169,13 @@ def run_sync(

return sync_result

def __repr__(self) -> str:
"""String representation of the connection."""
return (
f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
f"destination_id={self.destination_id}, connection_url={self.connection_url})"
)

# Logs

def get_previous_sync_logs(
Expand Down
13 changes: 11 additions & 2 deletions airbyte/cloud/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,17 @@ def __init__(

@property
def connector_url(self) -> str:
"""Get the URL of the source connector."""
return f"{self.workspace.workspace_url}/{self.connector_type}s/{self.connector_id}"
"""Get the web URL of the source connector."""
return f"{self.workspace.workspace_url}/{self.connector_type}/{self.connector_id}"

def __repr__(self) -> str:
"""String representation of the connector."""
return (
f"CloudConnector(type={self.connector_type!s}, "
f"workspace_id={self.workspace.workspace_id}, "
f"connector_id={self.connector_id}, "
f"connector_url={self.connector_url})"
)

def permanently_delete(self) -> None:
"""Permanently delete the connector."""
Expand Down
11 changes: 9 additions & 2 deletions airbyte/cloud/sync_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,15 @@ class SyncResult:

@property
def job_url(self) -> str:
"""Return the URL of the sync job."""
return f"{self.connection.job_history_url}/{self.job_id}"
"""Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL
to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
"""
return f"{self.connection.job_history_url}"

def _get_connection_info(self, *, force_refresh: bool = False) -> ConnectionResponse:
"""Return connection info for the sync job."""
Expand Down
20 changes: 15 additions & 5 deletions airbyte/cloud/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

from airbyte import exceptions as exc
from airbyte._util import api_util, text_util
from airbyte._util.api_util import get_web_url_root
from airbyte.cloud.connections import CloudConnection
from airbyte.cloud.connectors import CloudDestination, CloudSource
from airbyte.destinations.base import Destination
Expand Down Expand Up @@ -72,8 +73,8 @@ def __post_init__(self) -> None:

@property
def workspace_url(self) -> str | None:
"""The URL of the workspace."""
return f"{self.api_root}/workspaces/{self.workspace_id}"
"""The web URL of the workspace."""
return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

# Test connection and creds

Expand Down Expand Up @@ -375,7 +376,10 @@ def list_connections(
*,
name_filter: Callable | None = None,
) -> list[CloudConnection]:
"""List connections by name in the workspace."""
"""List connections by name in the workspace.
TODO: Add pagination support
"""
connections = api_util.list_connections(
api_root=self.api_root,
workspace_id=self.workspace_id,
Expand All @@ -401,7 +405,10 @@ def list_sources(
*,
name_filter: Callable | None = None,
) -> list[CloudSource]:
"""List all sources in the workspace."""
"""List all sources in the workspace.
TODO: Add pagination support
"""
sources = api_util.list_sources(
api_root=self.api_root,
workspace_id=self.workspace_id,
Expand All @@ -425,7 +432,10 @@ def list_destinations(
*,
name_filter: Callable | None = None,
) -> list[CloudDestination]:
"""List all destinations in the workspace."""
"""List all destinations in the workspace.
TODO: Add pagination support
"""
destinations = api_util.list_destinations(
api_root=self.api_root,
workspace_id=self.workspace_id,
Expand Down
30 changes: 30 additions & 0 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,3 +178,33 @@ def _str_to_bool(value: str) -> bool:
For more information, see the `airbyte.secrets` module documentation.
"""

# Cloud Constants

CLOUD_CLIENT_ID_ENV_VAR: str = "AIRBYTE_CLOUD_CLIENT_ID"
"""The environment variable name for the Airbyte Cloud client ID."""

CLOUD_CLIENT_SECRET_ENV_VAR: str = "AIRBYTE_CLOUD_CLIENT_SECRET"
"""The environment variable name for the Airbyte Cloud client secret."""

CLOUD_API_ROOT_ENV_VAR: str = "AIRBYTE_CLOUD_API_URL"
"""The environment variable name for the Airbyte Cloud API URL."""

CLOUD_WORKSPACE_ID_ENV_VAR: str = "AIRBYTE_CLOUD_WORKSPACE_ID"
"""The environment variable name for the Airbyte Cloud workspace ID."""

CLOUD_API_ROOT: str = "https://api.airbyte.com/v1"
"""The Airbyte Cloud API root URL.
This is the root URL for the Airbyte Cloud API. It is used to interact with the Airbyte Cloud API
and is the default API root for the `CloudWorkspace` class.
- https://reference.airbyte.com/reference/getting-started
"""

CLOUD_CONFIG_API_ROOT: str = "https://cloud.airbyte.com/api/v1"
"""Internal-Use API Root, aka Airbyte "Config API".
Documentation:
- https://docs.airbyte.com/api-documentation#configuration-api-deprecated
- https://github.com/airbytehq/airbyte-platform-internal/blob/master/oss/airbyte-api/server-api/src/main/openapi/config.yaml
"""
4 changes: 2 additions & 2 deletions airbyte/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ class AirbyteConnectionError(AirbyteError):

@property
def connection_url(self) -> str | None:
"""The URL to the connection where the error occurred."""
"""The web URL to the connection where the error occurred."""
if self.workspace_url and self.connection_id:
return f"{self.workspace_url}/connections/{self.connection_id}"

Expand All @@ -472,7 +472,7 @@ def connection_url(self) -> str | None:
def job_history_url(self) -> str | None:
"""The URL to the job history where the error occurred."""
if self.connection_url:
return f"{self.connection_url}/job-history"
return f"{self.connection_url}/timeline"

return None

Expand Down
Loading
Loading