Skip to content
Merged
186 changes: 119 additions & 67 deletions airbyte/_util/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,25 +150,39 @@ def list_connections(
client_secret=client_secret,
api_root=api_root,
)
response = airbyte_instance.connections.list_connections(
api.ListConnectionsRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.connections_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
result: list[models.ConnectionResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response = airbyte_instance.connections.list_connections(
api.ListConnectionsRequest(
workspace_ids=[workspace_id],
offset=offset,
limit=page_size,
),
)
assert response.connections_response is not None
return [
connection
for connection in response.connections_response.data
if name_filter(connection.name)
]
has_more = (
(response.connections_response.next is not None)
if response.connections_response
else False
)
offset += page_size

if not status_ok(response.status_code) and response.connections_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.connections_response is not None
result += [
connection
for connection in response.connections_response.data
if name_filter(connection.name)
]

return result


def list_workspaces(
Expand All @@ -192,24 +206,36 @@ def list_workspaces(
client_secret=client_secret,
api_root=api_root,
)
result: list[models.WorkspaceResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
api.ListWorkspacesRequest(workspace_ids=[workspace_id], offset=offset, limit=page_size),
)
has_more = (
(response.workspaces_response.next is not None)
if response.workspaces_response
else False
)
offset += page_size

response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
api.ListWorkspacesRequest(
workspace_ids=[workspace_id],
),
)
if not status_ok(response.status_code) and response.workspaces_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)

if not status_ok(response.status_code) and response.workspaces_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.workspaces_response is not None
return [
workspace for workspace in response.workspaces_response.data if name_filter(workspace.name)
]
assert response.workspaces_response is not None
result += [
workspace
for workspace in response.workspaces_response.data
if name_filter(workspace.name)
]

return result


def list_sources(
Expand All @@ -233,21 +259,33 @@ def list_sources(
client_secret=client_secret,
api_root=api_root,
)
response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
api.ListSourcesRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.sources_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
result: list[models.SourceResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
api.ListSourcesRequest(
workspace_ids=[workspace_id],
offset=offset,
limit=page_size,
),
)
has_more = (
(response.sources_response.next is not None) if response.sources_response else False
)
assert response.sources_response is not None
return [source for source in response.sources_response.data if name_filter(source.name)]
offset += page_size

if not status_ok(response.status_code) and response.sources_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.sources_response is not None
result += [source for source in response.sources_response.data if name_filter(source.name)]

return result


def list_destinations(
Expand All @@ -271,25 +309,39 @@ def list_destinations(
client_secret=client_secret,
api_root=api_root,
)
response = airbyte_instance.destinations.list_destinations(
api.ListDestinationsRequest(
workspace_ids=[workspace_id],
),
)

if not status_ok(response.status_code) and response.destinations_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
result: list[models.DestinationResponse] = []
has_more = True
offset, page_size = 0, 100
while has_more:
response = airbyte_instance.destinations.list_destinations(
api.ListDestinationsRequest(
workspace_ids=[workspace_id],
offset=offset,
limit=page_size,
),
)
assert response.destinations_response is not None
return [
destination
for destination in response.destinations_response.data
if name_filter(destination.name)
]
has_more = (
(response.destinations_response.next is not None)
if response.destinations_response
else False
)
offset += page_size

if not status_ok(response.status_code) and response.destinations_response:
raise AirbyteError(
context={
"workspace_id": workspace_id,
"response": response,
}
)
assert response.destinations_response is not None
result += [
destination
for destination in response.destinations_response.data
if name_filter(destination.name)
]

return result


# Get and run connections
Expand Down Expand Up @@ -369,7 +421,7 @@ def run_connection(
def get_job_logs(
workspace_id: str,
connection_id: str,
limit: int = 20,
limit: int = 100,
*,
api_root: str,
client_id: SecretString,
Expand Down
Loading