Skip to content

Conversation

aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Sep 11, 2025

feat: add pagination support to CloudWorkspace list methods

Summary

Adds pagination support to the CloudWorkspace list_connections, list_sources, and list_destinations methods by introducing limit, offset, and include_deleted parameters. These parameters are passed through to the underlying API utility functions and then to the respective API request objects.

Key Changes:

  • Added pagination parameters to api_util.list_connections, api_util.list_sources, and api_util.list_destinations functions
  • Updated CloudWorkspace methods to accept and pass through pagination parameters
  • Added comprehensive docstrings documenting the new parameters
  • Maintained backward compatibility with optional parameters and sensible defaults
  • Added # noqa: PLR0913 comments to suppress linting warnings for necessary parameter count

All parameters are optional with defaults: limit=20, offset=0, include_deleted=False.

Review & Testing Checklist for Human

  • Test pagination functionality end-to-end - Create a workspace with multiple connections/sources/destinations and verify that limit and offset parameters actually work as expected when called
  • Verify API request compatibility - Confirm that ListConnectionsRequest, ListSourcesRequest, and ListDestinationsRequest actually support the limit, offset, and include_deleted parameters in the current API version
  • Check default value appropriateness - Validate that limit=20 is a reasonable default that matches user expectations and API behavior
  • Confirm backward compatibility - Test that existing code calling these methods without pagination parameters continues to work unchanged

Notes

  • Changes passed all local checks: ruff formatting, ruff linting, and mypy type checking
  • The underlying API request objects were verified via LSP to have these parameters, but runtime testing requires actual credentials
  • This work was requested by AJ Steers (@aaronsteers) in the PyAirbyte cloud module
  • Link to Devin run: https://app.devin.ai/sessions/c8d92220996842419eb2683846027831

Summary by CodeRabbit

  • New Features

    • Listings for connections, workspaces, sources, and destinations now fetch all pages automatically and return aggregated, name-filtered results.
    • Default job log retrieval limit increased from 20 to 100 for more comprehensive logs.
    • Cloud connectors and connections now expose lazy-loaded display names for easier identification.
  • Bug Fixes

    • Fixed incomplete listings on large accounts by ensuring full multi-page results are returned.

Important

Auto-merge enabled.

This PR is set to merge automatically when all requirements are met.

Note

Auto-merge may have been disabled. Please check the PR status to confirm.

- Add limit, offset, and include_deleted parameters to list_connections, list_sources, and list_destinations
- Update underlying api_util functions to accept and pass through pagination parameters
- Maintain backward compatibility with optional parameters and sensible defaults
- Update docstrings to document new pagination parameters
- Add noqa comments to suppress PLR0913 linting warnings for necessary parameter count

Co-Authored-By: AJ Steers <[email protected]>
Copy link
Contributor

Original prompt from AJ Steers
@Devin - In PyAirbyte cloud module CloudWorkspace object, the "list_connections/connectors" implementations don't have pagination. Can you add it?
Thread URL: https://airbytehq-team.slack.com/archives/D089P0UPVT4/p1757556538569549?thread_ts=1757556538.569549

Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1757556687-add-pagination-to-cloud-workspace' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1757556687-add-pagination-to-cloud-workspace'

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test-pr - Runs tests with the updated PyAirbyte

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

Copy link

github-actions bot commented Sep 11, 2025

PyTest Results (Fast Tests Only, No Creds)

301 tests  ±0   301 ✅ ±0   4m 28s ⏱️ -1s
  1 suites ±0     0 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit daa496a. ± Comparison against base commit 6009b69.

♻️ This comment has been updated with latest results.

Copy link
Contributor

coderabbitai bot commented Sep 11, 2025

📝 Walkthrough

Walkthrough

Added offset/limit pagination (page_size=100) to list_connections, list_workspaces, list_sources, and list_destinations with per-page aggregation and name filtering; increased get_job_logs default limit to 100. Cloud connector/workspace classes gained cached factory constructors and lazy name properties that fetch connector info on demand.

Changes

Cohort / File(s) Summary of Changes
API pagination & defaults
airbyte/_util/api_util.py
list_connections, list_workspaces, list_sources, list_destinations now use offset/limit pagination (page_size=100), iterate pages, aggregate items, apply name_filter per item, and raise on per-page errors. get_job_logs default limit increased from 20 to 100. Signatures annotated with # noqa: PLR0913.
CloudConnection factory & name
airbyte/cloud/connections.py
Added CloudConnection._from_connection_response(workspace, connection_response) to construct from API response and cache the response; added CloudConnection.name property that lazily fetches/returns the display name from cached response.
CloudConnector caching & fetch API
airbyte/cloud/connectors.py
Added _connector_info cache and name property to CloudConnector; introduced abstract _fetch_connector_info method; added _from_source_response and _from_destination_response factories; implemented _fetch_connector_info in CloudSource and CloudDestination using api_util.get_source / api_util.get_destination.
Workspace factories usage
airbyte/cloud/workspaces.py
Reworked list methods to construct objects via internal factory methods (CloudConnection._from_connection_response, CloudSource._from_source_response, CloudDestination._from_destination_response) instead of direct instantiation; no public signature changes.
Integration tests adjusted
tests/integration_tests/cloud/test_cloud_workspaces.py
Updated test deployments to use specific names and added unique=False argument for deploy_source calls.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant C as Caller
  participant U as api_util (list_*)
  participant A as Airbyte API

  C->>U: list_* (workspace_id, creds, name_filter)
  activate U
  U->>U: offset=0, page_size=100, results=[]
  loop pages
    U->>A: ListRequest(offset, limit=100)
    A-->>U: ListResponse(items, next)
    U->>U: filter items (name_filter), append to results
    U->>U: offset += page_size (or stop if no next)
  end
  U-->>C: aggregated results
  deactivate U
Loading
sequenceDiagram
  autonumber
  participant W as CloudWorkspace
  participant Obj as CloudConnector/Connection
  participant U as api_util (get_source/get_destination/get_connection)

  W->>Obj: create via _from_*_response (cache response)
  Note right of Obj: _connector_info / _connection_info cached
  Obj->>Obj: access .name property
  alt cached present
    Obj-->>W: return cached display name
  else fetch needed
    Obj->>U: _fetch_connector_info()
    U-->>Obj: API response
    Obj->>Obj: cache response
    Obj-->>W: return display name
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Would you like a suggested unit test matrix for paginated endpoints and lazy name-fetch behavior, wdyt?

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title concisely and accurately summarizes the PR's primary changes — adding pagination support and adding a name attribute for CloudConnection, CloudSource, and CloudDestination — and directly maps to the modifications in api_util and cloud modules.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch devin/1757556687-add-pagination-to-cloud-workspace

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (6)
airbyte/cloud/workspaces.py (3)

377-389: Pagination params threaded correctly; tighten docs, drop redundant filter, and consider stricter typing — wdyt?

  • Docs: “List connections by name” + “Args” read well, but “maximum/offset/include deleted” imply pagination; could we clarify “List up to limit connections…” and note name and name_filter are mutually exclusive?
  • Typing: Would you switch name_filter: Callable | None to Callable[[str], bool] | None to align with api_util?
  • Redundant filter: api_util.list_connections() already applies name/name_filter; you can drop the local if name is None or connection.name == name for cleaner logic.

Proposed diff:

 def list_connections(
     self,
     name: str | None = None,
     *,
-    name_filter: Callable | None = None,
+    name_filter: Callable[[str], bool] | None = None,
     limit: int | None = 20,
     offset: int | None = 0,
     include_deleted: bool | None = False,
 ) -> list[CloudConnection]:
-    """List connections by name in the workspace.
+    """List connections in the workspace.
 
-    Args:
-        name: Optional name filter for connections.
-        name_filter: Optional callable to filter connections by name.
-        limit: Maximum number of connections to return (default: 20).
-        offset: Number of connections to skip (default: 0).
-        include_deleted: Whether to include deleted connections (default: False).
+    Args:
+        name: Optional exact-match name filter (mutually exclusive with `name_filter`).
+        name_filter: Optional callable to filter by name.
+        limit: Page size (max items to return). Defaults to 20.
+        offset: Items to skip (pagination offset). Defaults to 0.
+        include_deleted: Include soft-deleted connections. Defaults to False.
     """
@@
-    return [
-        CloudConnection(
-            workspace=self,
-            connection_id=connection.connection_id,
-            source=None,
-            destination=None,
-        )
-        for connection in connections
-        if name is None or connection.name == name
-    ]
+    return [
+        CloudConnection(
+            workspace=self,
+            connection_id=connection.connection_id,
+            source=None,
+            destination=None,
+        )
+        for connection in connections
+    ]

Also applies to: 395-398, 401-410


417-429: Same nits for sources: clarify doc, precise typing, remove double filter — wdyt?

  • Clarify docstring language for pagination and mutual exclusivity.
  • Use Callable[[str], bool] | None.
  • Drop redundant local name equality filter.
 def list_sources(
     self,
     name: str | None = None,
     *,
-    name_filter: Callable | None = None,
+    name_filter: Callable[[str], bool] | None = None,
     limit: int | None = 20,
     offset: int | None = 0,
     include_deleted: bool | None = False,
 ) -> list[CloudSource]:
-    """List all sources in the workspace.
+    """List sources in the workspace.
 
-    Args:
-        name: Optional name filter for sources.
-        name_filter: Optional callable to filter sources by name.
-        limit: Maximum number of sources to return (default: 20).
-        offset: Number of sources to skip (default: 0).
-        include_deleted: Whether to include deleted sources (default: False).
+    Args:
+        name: Optional exact-match name filter (mutually exclusive with `name_filter`).
+        name_filter: Optional callable to filter by name.
+        limit: Page size (max items). Defaults to 20.
+        offset: Pagination offset. Defaults to 0.
+        include_deleted: Include soft-deleted sources. Defaults to False.
     """
@@
-    return [
-        CloudSource(
-            workspace=self,
-            connector_id=source.source_id,
-        )
-        for source in sources
-        if name is None or source.name == name
-    ]
+    return [
+        CloudSource(workspace=self, connector_id=source.source_id)
+        for source in sources
+    ]

Also applies to: 435-438, 441-448


455-467: Same nits for destinations: doc phrasing, typing, and filter — wdyt?

  • Clarify docstring for pagination/mutual exclusivity.
  • Use Callable[[str], bool] | None.
  • Remove redundant name filter in list comprehension.
 def list_destinations(
     self,
     name: str | None = None,
     *,
-    name_filter: Callable | None = None,
+    name_filter: Callable[[str], bool] | None = None,
     limit: int | None = 20,
     offset: int | None = 0,
     include_deleted: bool | None = False,
 ) -> list[CloudDestination]:
-    """List all destinations in the workspace.
+    """List destinations in the workspace.
 
-    Args:
-        name: Optional name filter for destinations.
-        name_filter: Optional callable to filter destinations by name.
-        limit: Maximum number of destinations to return (default: 20).
-        offset: Number of destinations to skip (default: 0).
-        include_deleted: Whether to include deleted destinations (default: False).
+    Args:
+        name: Optional exact-match name filter (mutually exclusive with `name_filter`).
+        name_filter: Optional callable to filter by name.
+        limit: Page size (max items). Defaults to 20.
+        offset: Pagination offset. Defaults to 0.
+        include_deleted: Include soft-deleted destinations. Defaults to False.
     """
@@
-    return [
-        CloudDestination(
-            workspace=self,
-            connector_id=destination.destination_id,
-        )
-        for destination in destinations
-        if name is None or destination.name == name
-    ]
+    return [
+        CloudDestination(workspace=self, connector_id=destination.destination_id)
+        for destination in destinations
+    ]

Also applies to: 473-476, 479-486

airbyte/_util/api_util.py (3)

703-737: Optional: ensure get_connection_by_name searches beyond the first page — wdyt?

Right now it relies on the default page (limit=20, offset=0). If a matching connection is beyond the first page, it will be missed. Would you page until found or exhausted?

 def get_connection_by_name(
@@
-    connections = list_connections(
-        workspace_id=workspace_id,
-        api_root=api_root,
-        client_id=client_id,
-        client_secret=client_secret,
-    )
-    found: list[models.ConnectionResponse] = [
-        connection for connection in connections if connection.name == connection_name
-    ]
+    offset = 0
+    limit = 200
+    found: list[models.ConnectionResponse] = []
+    while True:
+        page = list_connections(
+            workspace_id=workspace_id,
+            api_root=api_root,
+            client_id=client_id,
+            client_secret=client_secret,
+            limit=limit,
+            offset=offset,
+            name=connection_name,  # server-side filter when possible
+        )
+        found.extend([c for c in page if c.name == connection_name])
+        if found or len(page) < (limit or 0):
+            break
+        offset += (limit or 0)

131-176: Minor docs nit: add param docs for new pagination flags on internal helpers — wdyt?

Since these are internal, it’s fine as-is, but a short note for limit, offset, and include_deleted in the docstrings would help future readers.

Also applies to: 221-309


132-176: Optional: introduce a small pagination iterator to DRY up paging in callers — wdyt?

A generic helper like iterate_pages(fetch_page: Callable[[int,int], list[T]], limit=200) could centralize paging logic for internal use and tests.

Also applies to: 221-309

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e2f3fd5 and 281c899.

📒 Files selected for processing (2)
  • airbyte/_util/api_util.py (6 hunks)
  • airbyte/cloud/workspaces.py (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte/cloud/workspaces.py (1)
airbyte/_util/api_util.py (3)
  • list_connections (132-177)
  • list_sources (221-262)
  • list_destinations (265-310)
airbyte/_util/api_util.py (1)
airbyte/cloud/workspaces.py (3)
  • list_connections (372-410)
  • list_sources (412-448)
  • list_destinations (450-486)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte/_util/api_util.py (1)

316-336: Audit list_ & get_connection_by_name call sites for pagination defaults*

Scan found call sites — confirm whether callers expect full result sets or need pagination handling:

  • tests/integration_tests/cloud/test_cloud_api_util.py: list_sources (lines ~63–65), list_destinations (82–84)
  • airbyte/_util/api_util.py: list_connections (def ~132), list_sources (def ~221), list_destinations (def ~265), get_connection_by_name (def ~703), get_connection (calls list_connections ~712)
  • airbyte/cloud/workspaces.py: callers of api_util.list_* at ~lines 165–169, 220–224, 372–392, 412–432, 450–470

Shall I run a targeted grep for callers that assume single-page/full results (e.g., direct indexing, len checks, single-loop usage) or prepare a small pagination-safe helper/patch? wdyt?

Copy link

github-actions bot commented Sep 11, 2025

PyTest Results (Full)

364 tests  ±0   348 ✅ ±0   21m 14s ⏱️ - 4m 36s
  1 suites ±0    16 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit daa496a. ± Comparison against base commit 6009b69.

♻️ This comment has been updated with latest results.

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Sep 11, 2025

/fix-pr

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.
(This job requires that the PR author has "Allow edits from maintainers" enabled.)

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte/_util/api_util.py (1)

132-181: Expose pagination params, propagate them, and fix error handling (action required)

  • Add limit: int|None = 20, offset: int|None = 0, include_deleted: bool = False to airbyte/_util/api_util.py::list_connections (snippet ~lines 132–181), validate non-negative, use them for page_size/curr_offset (remove hard-coded 100) and drop the assert — wdyt?
  • Propagate those params through airbyte/cloud/workspaces.py list_connections/list_sources/list_destinations (defs found at lines ~372, ~398, ~422); the supplied rg output shows no callers passing limit/offset/include_deleted, so update these wrappers to accept and forward them — wdyt?
  • Fix error handling to raise on non-2xx OR missing body (replace the current and check with or response.connections_response is None) so we don't fall through to an assert — wdyt?
  • Remove or justify # noqa: PLR0913 occurrences (found at api_util.py lines 132/237/289 and create_connection at 683); if unused, drop them to avoid CI (RUF100) noise — wdyt?
♻️ Duplicate comments (3)
airbyte/_util/api_util.py (3)

237-287: Add pagination params for sources, propagate to request, and fix error handling.

To match the PR summary and fix robustness, can we add limit/offset/include_deleted with validation, pass them through to ListSourcesRequest, and switch to OR in the error guard, wdyt?

-def list_sources(  # noqa: PLR0913
+def list_sources(  # noqa: PLR0913
     workspace_id: str,
     *,
     api_root: str,
     client_id: SecretString,
     client_secret: SecretString,
     name: str | None = None,
-    name_filter: Callable[[str], bool] | None = None,
+    name_filter: Callable[[str], bool] | None = None,
+    limit: int | None = 20,
+    offset: int | None = 0,
+    include_deleted: bool | None = False,
 ) -> list[models.SourceResponse]:
@@
-    airbyte_instance: airbyte_api.AirbyteAPI = get_airbyte_server_instance(
+    airbyte_instance: airbyte_api.AirbyteAPI = get_airbyte_server_instance(
         client_id=client_id,
         client_secret=client_secret,
         api_root=api_root,
     )
-    result: list[models.SourceResponse] = []
-    has_more = True
-    offset, page_size = 0, 100
-    while has_more:
+    result: list[models.SourceResponse] = []
+    has_more = True
+    curr_offset = 0 if offset is None else offset
+    page_size = 20 if limit is None else limit
+    while has_more:
         response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
             api.ListSourcesRequest(
                 workspace_ids=[workspace_id],
-                offset=offset,
-                limit=page_size,
+                offset=curr_offset,
+                limit=page_size,
+                include_deleted=include_deleted,
             ),
         )
-        has_more = response.next is not None
-        offset += page_size
+        has_more = response.next is not None
+        curr_offset += page_size
@@
-        if not status_ok(response.status_code) and response.sources_response:
+        if not status_ok(response.status_code) or response.sources_response is None:
             raise AirbyteError(
                 context={
                     "workspace_id": workspace_id,
                     "response": response,
                 }
             )
-        assert response.sources_response is not None
         result += [
             source
             for source in response.sources_response.data
             if name_filter(source.name)
         ]

289-339: Add pagination params for destinations, propagate to request, and fix error handling.

Same issues here: unused noqa, missing limit/offset/include_deleted pass-through, and the “and” vs “or” bug. Mirror the fixes from connections/sources, wdyt?

-def list_destinations(  # noqa: PLR0913
+def list_destinations(  # noqa: PLR0913
     workspace_id: str,
     *,
     api_root: str,
     client_id: SecretString,
     client_secret: SecretString,
     name: str | None = None,
-    name_filter: Callable[[str], bool] | None = None,
+    name_filter: Callable[[str], bool] | None = None,
+    limit: int | None = 20,
+    offset: int | None = 0,
+    include_deleted: bool | None = False,
 ) -> list[models.DestinationResponse]:
@@
-    result: list[models.DestinationResponse] = []
-    has_more = True
-    offset, page_size = 0, 100
-    while has_more:
+    result: list[models.DestinationResponse] = []
+    has_more = True
+    curr_offset = 0 if offset is None else offset
+    page_size = 20 if limit is None else limit
+    while has_more:
         response = airbyte_instance.destinations.list_destinations(
             api.ListDestinationsRequest(
                 workspace_ids=[workspace_id],
-                offset=offset,
-                limit=page_size,
+                offset=curr_offset,
+                limit=page_size,
+                include_deleted=include_deleted,
             ),
         )
-        has_more = response.next is not None
-        offset += page_size
+        has_more = response.next is not None
+        curr_offset += page_size
@@
-        if not status_ok(response.status_code) and response.destinations_response:
+        if not status_ok(response.status_code) or response.destinations_response is None:
             raise AirbyteError(
                 context={
                     "workspace_id": workspace_id,
                     "response": response,
                 }
             )
-        assert response.destinations_response is not None
         result += [
             destination
             for destination in response.destinations_response.data
             if name_filter(destination.name)
         ]

205-235: Fix error handling in list_workspaces to avoid assert-path on failures.

Current check raises only when non-2xx AND body exists. It should raise on non-2xx OR missing body to prevent falling through to the assert. Apply the same pattern as above, wdyt?

-        if not status_ok(response.status_code) and response.workspaces_response:
+        if not status_ok(response.status_code) or response.workspaces_response is None:
             raise AirbyteError(
                 context={
                     "workspace_id": workspace_id,
                     "response": response,
                 }
             )
-        assert response.workspaces_response is not None
🧹 Nitpick comments (1)
airbyte/_util/api_util.py (1)

418-418: Confirm scope: bumping get_job_logs default from 20 → 100.

Is this change intentional and within the scope of “pagination for list methods”? If yes, could we also add a quick non-negative guard on limit for symmetry with the list_* fns, wdyt?

 def get_job_logs(
     workspace_id: str,
     connection_id: str,
-    limit: int = 100,
+    limit: int = 100,
@@
     """Get a job's logs."""
+    if limit < 0:
+        raise PyAirbyteInputError(message="`limit` must be non-negative.")
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 281c899 and 18d25d4.

📒 Files selected for processing (1)
  • airbyte/_util/api_util.py (6 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-07-09T06:42:41.304Z
Learnt from: Suraj-Vishwakarma70
PR: airbytehq/PyAirbyte#285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.

Applied to files:

  • airbyte/_util/api_util.py
🧬 Code graph analysis (1)
airbyte/_util/api_util.py (2)
airbyte/cloud/workspaces.py (3)
  • list_connections (372-396)
  • list_sources (398-420)
  • list_destinations (422-444)
airbyte/exceptions.py (1)
  • AirbyteError (432-447)
🪛 GitHub Actions: Run Linters
airbyte/_util/api_util.py

[error] 132-132: RUF100: Unused noqa directive (PLR0913) detected in airbyte/_util/api_util.py at line 132. Remove unused noqa directive.


[error] 237-237: RUF100: Unused noqa directive (PLR0913) detected in airbyte/_util/api_util.py at line 237. Remove unused noqa directive.


[error] 289-289: RUF100: Unused noqa directive (PLR0913) detected in airbyte/_util/api_util.py at line 289. Remove unused noqa directive.

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)

@aaronsteers aaronsteers enabled auto-merge (squash) September 11, 2025 04:55
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
airbyte/_util/api_util.py (1)

132-141: Expose and validate pagination params (limit, offset, include_deleted) on all list_ functions.*

  • The PR claims these params were propagated but current signatures in airbyte/_util/api_util.py (list_connections, list_workspaces, list_sources, list_destinations) and airbyte/cloud/workspaces.py (list_connections, list_sources, list_destinations) only accept name/name_filter — add params: limit: int | None = 20, offset: int | None = 0, include_deleted: bool = False (use # noqa: PLR0913 if needed).
  • Validate inputs up front: if (limit is not None and limit < 0) or (offset is not None and offset < 0): raise PyAirbyteInputError("limit and offset must be non-negative."); if not isinstance(include_deleted, bool): raise PyAirbyteInputError("include_deleted must be a boolean.").
  • Plumb these through to the API requests (api.List*Request: offset, limit — and include_deleted only if the generated airbyte_api models support it; repository search shows no include_deleted usage so confirm API/model support before adding).
  • Propagate the same arguments from CloudWorkspace.list_* so end users can call them.

Shall I prepare a small PR to implement these changes and update callers/tests? wdyt?

♻️ Duplicate comments (3)
airbyte/_util/api_util.py (3)

164-179: Harden error handling, use server-supplied cursor, and fix E501 in list_connections — wdyt?

The current branch only raises on non-2xx when a body exists, computes has_more before validating the response, and has an E501-flagged long line. This also ignores a server-provided next cursor if it’s numeric.

Apply:

-        has_more = (response.connections_response.next is not None) if response.connections_response else False
-        offset += page_size
-
-        if not status_ok(response.status_code) and response.connections_response:
+        if not status_ok(response.status_code) or response.connections_response is None:
             raise AirbyteError(
                 context={
                     "workspace_id": workspace_id,
                     "response": response,
                 }
             )
-        assert response.connections_response is not None
+        next_token = getattr(response.connections_response, "next", None)
+        has_more = next_token is not None
+        # Prefer server cursor when numeric; otherwise bump by page_size.
+        try:
+            offset = int(next_token) if next_token is not None else offset + page_size
+        except (TypeError, ValueError):
+            offset += page_size
         result += [
             connection
             for connection in response.connections_response.data
             if name_filter(connection.name)
         ]

212-229: Same hardening for list_workspaces + fix E501 — wdyt?

Mirror the error-path and cursor handling to avoid asserts on None and fix the long line.

-        has_more = (response.workspaces_response.next is not None) if response.workspaces_response else False
-        offset += page_size
-
-        if not status_ok(response.status_code) and response.workspaces_response:
+        if not status_ok(response.status_code) or response.workspaces_response is None:
             raise AirbyteError(
                 context={
                     "workspace_id": workspace_id,
                     "response": response,
                 }
             )
-
-        assert response.workspaces_response is not None
+        next_token = getattr(response.workspaces_response, "next", None)
+        has_more = next_token is not None
+        try:
+            offset = int(next_token) if next_token is not None else offset + page_size
+        except (TypeError, ValueError):
+            offset += page_size

313-328: Same hardening for list_destinations + fix E501 — wdyt?

Replicate the robust non-2xx handling and cursor/offset logic here as well.

-        has_more = (response.destinations_response.next is not None) if response.destinations_response else False
-        offset += page_size
-
-        if not status_ok(response.status_code) and response.destinations_response:
+        if not status_ok(response.status_code) or response.destinations_response is None:
             raise AirbyteError(
                 context={
                     "workspace_id": workspace_id,
                     "response": response,
                 }
             )
-        assert response.destinations_response is not None
+        next_token = getattr(response.destinations_response, "next", None)
+        has_more = next_token is not None
+        try:
+            offset = int(next_token) if next_token is not None else offset + page_size
+        except (TypeError, ValueError):
+            offset += page_size
🧹 Nitpick comments (1)
airbyte/_util/api_util.py (1)

410-410: Confirm the default limit bump for get_job_logs (20 → 100) — intentional?

This changes behavior and bandwidth by default. If intentional, shall we add a brief note in the docstring and validate limit >= 0? Otherwise, consider keeping 20 and letting callers opt into higher limits. Wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4f1a82e and 5836e1a.

📒 Files selected for processing (1)
  • airbyte/_util/api_util.py (5 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-07-09T06:42:41.304Z
Learnt from: Suraj-Vishwakarma70
PR: airbytehq/PyAirbyte#285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.

Applied to files:

  • airbyte/_util/api_util.py
🧬 Code graph analysis (1)
airbyte/_util/api_util.py (2)
airbyte/cloud/workspaces.py (3)
  • list_connections (372-396)
  • list_sources (398-420)
  • list_destinations (422-444)
airbyte/exceptions.py (1)
  • AirbyteError (432-447)
🪛 GitHub Actions: Run Linters
airbyte/_util/api_util.py

[error] 164-164: ruff: E501 Line too long (111 > 100) in airbyte/_util/api_util.py:164


[error] 212-212: ruff: E501 Line too long (109 > 100) in airbyte/_util/api_util.py:212


[error] 265-265: ruff: E501 Line too long (102 > 100) in airbyte/_util/api_util.py:265


[error] 313-313: ruff: E501 Line too long (113 > 100) in airbyte/_util/api_util.py:313

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte/_util/api_util.py (1)

153-156: Align pagination with public limit/offset/include_deleted and honor caller limit — wdyt?

airbyte/_util/api_util.py (around lines 153–156): page_size is hardcoded to 100 and offset set to 0, preventing callers from paging or opting to include deleted items — can we plumb and honor limit, offset, include_deleted and stop early when limit is reached?

  • Add params (limit: Optional[int]=None, offset: int=0, include_deleted: bool=False) to the function signature and mirror across sources/destinations and CloudWorkspace — wdyt?
  • Replace the hardcoded init with caller-aware variables (e.g., current_offset = offset or 0; remaining = limit or None; page_size = min(100, remaining) if remaining is not None else 100) so we respect caller limits — wdyt?
  • When building the API request, use offset=current_offset, limit=page_size, and include_deleted=include_deleted if the SDK supports it — wdyt?
  • After extending result, decrement remaining by the number of items returned and break when remaining <= 0; advance current_offset by the number of items returned to continue paging reliably — wdyt?
  • Mirror these changes and plumbing through CloudWorkspace methods so callers can page and control include_deleted — wdyt?

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Sep 11, 2025

/fix-pr

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.
(This job requires that the PR author has "Allow edits from maintainers" enabled.)

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
airbyte/_util/api_util.py (2)

262-288: Same error-path and cursor issues as above; also missing pass-through of pagination flags — mirror the fix, wdyt?

Apply the robust error check first, then compute has_more/offset. If you intend to support limit, offset, and include_deleted, please add them to the signature and forward to ListSourcesRequest.

@@
-        has_more = (
-            (response.sources_response.next is not None) if response.sources_response else False
-        )
-        offset += page_size
-
-        if not status_ok(response.status_code) and response.sources_response:
+        if not status_ok(response.status_code) or response.sources_response is None:
             raise AirbyteError(
                 context={
                     "workspace_id": workspace_id,
                     "response": response,
                 }
             )
-        assert response.sources_response is not None
+        next_token = getattr(response.sources_response, "next", None)
+        has_more = next_token is not None
+        try:
+            offset = int(next_token) if next_token is not None else offset + page_size
+        except (TypeError, ValueError):
+            offset += page_size
         result += [source for source in response.sources_response.data if name_filter(source.name)]

Would you like me to draft the signature and request updates analogous to list_connections, wdyt?


312-344: Destinations: fix error-check ordering/condition and wire pagination flags, wdyt?

Same pattern fix as the other list functions; raise first, then derive has_more and update offset.

@@
-        has_more = (
-            (response.destinations_response.next is not None)
-            if response.destinations_response
-            else False
-        )
-        offset += page_size
-
-        if not status_ok(response.status_code) and response.destinations_response:
+        if not status_ok(response.status_code) or response.destinations_response is None:
             raise AirbyteError(
                 context={
                     "workspace_id": workspace_id,
                     "response": response,
                 }
             )
-        assert response.destinations_response is not None
+        next_token = getattr(response.destinations_response, "next", None)
+        has_more = next_token is not None
+        try:
+            offset = int(next_token) if next_token is not None else offset + page_size
+        except (TypeError, ValueError):
+            offset += page_size
         result += [
             destination
             for destination in response.destinations_response.data
             if name_filter(destination.name)
         ]

If we add limit, offset, and include_deleted to the signature, shall we forward them to ListDestinationsRequest the same way, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 784f024 and 55c06bf.

📒 Files selected for processing (1)
  • airbyte/_util/api_util.py (5 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-07-09T06:42:41.304Z
Learnt from: Suraj-Vishwakarma70
PR: airbytehq/PyAirbyte#285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.

Applied to files:

  • airbyte/_util/api_util.py
🧬 Code graph analysis (1)
airbyte/_util/api_util.py (2)
airbyte/cloud/workspaces.py (3)
  • list_connections (372-396)
  • list_sources (398-420)
  • list_destinations (422-444)
airbyte/exceptions.py (1)
  • AirbyteError (432-447)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)
🔇 Additional comments (1)
airbyte/_util/api_util.py (1)

424-424: Confirm scope: bumping get_job_logs default limit from 20 to 100 is out-of-scope for this PR — keep or revert, wdyt?

This may change behavior/perf for consumers unexpectedly. If intentional, could you add a brief note in the PR description and docs? Otherwise, consider reverting to 20 for backward compatibility.

@aaronsteers aaronsteers changed the title feat: add pagination support to CloudWorkspace list methods feat: add pagination support to CloudWorkspace list methods Sep 20, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
airbyte/cloud/connections.py (1)

120-128: Bug: destination_id incorrectly pulled from source_id.

destination_id assigns self._connection_info.source_id rather than destination_id, which will break any destination accessors and URL rendering. Fix required before merge, wdyt?

Apply this diff:

     @property
     def destination_id(self) -> str:
         """The ID of the destination."""
         if not self._destination_id:
             if not self._connection_info:
                 self._connection_info = self._fetch_connection_info()

-            self._destination_id = self._connection_info.source_id
+            self._destination_id = self._connection_info.destination_id

         return cast("str", self._destination_id)
airbyte/_util/api_util.py (1)

132-141: Expose pagination params (limit, offset, include_deleted) in list_ and CloudWorkspace methods*

PR says limit/offset/include_deleted were added, but airbyte/_util/api_util.py (list_connections/sources/destinations) and airbyte/cloud/workspaces.py (list_connections/sources/destinations) still omit them — api.ListRequest is being built with offset internally but callers can't pass pagination/include_deleted. Shall we add limit, offset, include_deleted to those signatures and forward them into api.ListRequest so the CloudWorkspace API matches the PR description? wdyt?

♻️ Duplicate comments (4)
airbyte/_util/api_util.py (4)

204-229: Reorder checks in list_workspaces and advance cursor from body.next.

Compute/advance pagination only after validating the response; use or to raise when body is missing; derive the next offset from workspaces_response.next. Patch below, wdyt?

-    result: list[models.WorkspaceResponse] = []
-    has_more = True
-    offset, page_size = 0, 100
-    while has_more:
-        response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
-            api.ListWorkspacesRequest(workspace_ids=[workspace_id], offset=offset, limit=page_size),
-        )
-        has_more = bool(response.workspaces_response and response.workspaces_response.next)
-        offset += page_size
-
-        if not status_ok(response.status_code) and response.workspaces_response:
-            raise AirbyteError(
-                context={
-                    "workspace_id": workspace_id,
-                    "response": response,
-                }
-            )
-
-        assert response.workspaces_response is not None
-        result += [
-            workspace
-            for workspace in response.workspaces_response.data
-            if name_filter(workspace.name)
-        ]
-
-    return result
+    result: list[models.WorkspaceResponse] = []
+    page_size = 100
+    current_offset = 0
+    has_more = True
+    while has_more:
+        response: api.ListWorkspacesResponse = airbyte_instance.workspaces.list_workspaces(
+            api.ListWorkspacesRequest(workspace_ids=[workspace_id], offset=current_offset, limit=page_size),
+        )
+        if not status_ok(response.status_code) or response.workspaces_response is None:
+            raise AirbyteError(
+                context={
+                    "workspace_id": workspace_id,
+                    "response": response,
+                }
+            )
+        page_data = response.workspaces_response.data
+        result += [w for w in page_data if name_filter(w.name)]
+        next_token = getattr(response.workspaces_response, "next", None)
+        has_more = next_token is not None
+        try:
+            current_offset = int(next_token) if next_token is not None else current_offset + page_size
+        except (TypeError, ValueError):
+            current_offset += page_size
+
+    return result

301-329: Apply the same pagination/error-handling and new params to list_destinations.

Mirror the fixes for connections/sources: raise early, add limit/offset/include_deleted, and advance from .next. Patch below, wdyt?

-    result: list[models.DestinationResponse] = []
-    has_more = True
-    offset, page_size = 0, 100
-    while has_more:
-        response = airbyte_instance.destinations.list_destinations(
-            api.ListDestinationsRequest(
-                workspace_ids=[workspace_id],
-                offset=offset,
-                limit=page_size,
-            ),
-        )
-        has_more = bool(response.destinations_response and response.destinations_response.next)
-        offset += page_size
-
-        if not status_ok(response.status_code) and response.destinations_response:
-            raise AirbyteError(
-                context={
-                    "workspace_id": workspace_id,
-                    "response": response,
-                }
-            )
-        assert response.destinations_response is not None
-        result += [
-            destination
-            for destination in response.destinations_response.data
-            if name_filter(destination.name)
-        ]
-
-    return result
+    result: list[models.DestinationResponse] = []
+    page_size = 100
+    current_offset = offset or 0
+    remaining = None if limit is None else int(limit)
+    has_more = True
+    while has_more and (remaining is None or remaining > 0):
+        request_limit = page_size if remaining is None else max(1, min(page_size, remaining))
+        response = airbyte_instance.destinations.list_destinations(
+            api.ListDestinationsRequest(
+                workspace_ids=[workspace_id],
+                offset=current_offset,
+                limit=request_limit,
+                include_deleted=bool(include_deleted),
+            ),
+        )
+        if not status_ok(response.status_code) or response.destinations_response is None:
+            raise AirbyteError(
+                context={
+                    "workspace_id": workspace_id,
+                    "response": response,
+                }
+            )
+        page_data = response.destinations_response.data
+        filtered = [d for d in page_data if name_filter(d.name)]
+        result += filtered
+        if remaining is not None:
+            remaining -= len(filtered)
+        next_token = getattr(response.destinations_response, "next", None)
+        has_more = next_token is not None
+        try:
+            current_offset = int(next_token) if next_token is not None else current_offset + request_limit
+        except (TypeError, ValueError):
+            current_offset += request_limit
+
+    return result

Signature + guards (outside this hunk):

 def list_destinations(
     workspace_id: str,
     *,
     api_root: str,
     client_id: SecretString,
     client_secret: SecretString,
     name: str | None = None,
-    name_filter: Callable[[str], bool] | None = None,
+    name_filter: Callable[[str], bool] | None = None,
+    limit: int | None = 20,
+    offset: int | None = 0,
+    include_deleted: bool | None = False,
 ) -> list[models.DestinationResponse]:
@@
-    name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)
+    name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)
+    if (limit is not None and limit < 0) or (offset is not None and offset < 0):
+        raise PyAirbyteInputError(message="`limit` and `offset` must be non-negative.")

153-180: Fix error handling ordering and add pagination parameters (limit/offset/include_deleted) to match PR goals.

  • Raise on non-2xx or missing body before reading it; current and condition can fall through to assert.
  • Expose limit, offset, include_deleted and pass them to ListConnectionsRequest.
  • Respect limit for total returned items post-filtering; use response .next when advancing the cursor robustly.

Proposed patch for this hunk, wdyt?

-    result: list[models.ConnectionResponse] = []
-    has_more = True
-    offset, page_size = 0, 100
-    while has_more:
-        response = airbyte_instance.connections.list_connections(
-            api.ListConnectionsRequest(
-                workspace_ids=[workspace_id],
-                offset=offset,
-                limit=page_size,
-            ),
-        )
-        has_more = bool(response.connections_response and response.connections_response.next)
-        offset += page_size
-
-        if not status_ok(response.status_code) and response.connections_response:
-            raise AirbyteError(
-                context={
-                    "workspace_id": workspace_id,
-                    "response": response,
-                }
-            )
-        assert response.connections_response is not None
-        result += [
-            connection
-            for connection in response.connections_response.data
-            if name_filter(connection.name)
-        ]
-    return result
+    result: list[models.ConnectionResponse] = []
+    page_size = 100
+    current_offset = offset or 0
+    remaining = None if limit is None else int(limit)
+    has_more = True
+    while has_more and (remaining is None or remaining > 0):
+        request_limit = page_size if remaining is None else max(1, min(page_size, remaining))
+        response = airbyte_instance.connections.list_connections(
+            api.ListConnectionsRequest(
+                workspace_ids=[workspace_id],
+                offset=current_offset,
+                limit=request_limit,
+                include_deleted=bool(include_deleted),
+            ),
+        )
+        if not status_ok(response.status_code) or response.connections_response is None:
+            raise AirbyteError(
+                context={
+                    "workspace_id": workspace_id,
+                    "response": response,
+                }
+            )
+        # Body guaranteed non-None here
+        page_data = response.connections_response.data
+        filtered = [c for c in page_data if name_filter(c.name)]
+        result += filtered
+        if remaining is not None:
+            remaining -= len(filtered)
+        next_token = getattr(response.connections_response, "next", None)
+        has_more = next_token is not None
+        try:
+            current_offset = int(next_token) if next_token is not None else current_offset + request_limit
+        except (TypeError, ValueError):
+            current_offset += request_limit
+    return result

And update the signature + guards (outside this hunk):

 def list_connections(
     workspace_id: str,
     *,
     api_root: str,
     client_id: SecretString,
     client_secret: SecretString,
     name: str | None = None,
-    name_filter: Callable[[str], bool] | None = None,
+    name_filter: Callable[[str], bool] | None = None,
+    limit: int | None = 20,
+    offset: int | None = 0,
+    include_deleted: bool | None = False,
 ) -> list[models.ConnectionResponse]:
@@
-    name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)
+    name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)
+    if (limit is not None and limit < 0) or (offset is not None and offset < 0):
+        raise PyAirbyteInputError(message="`limit` and `offset` must be non-negative.")

Would you like me to mirror the same changes for sources/destinations below as well?


253-277: Apply the same pagination/error-handling and new params to list_sources.

  • Raise early on non-2xx or missing body.
  • Add and respect limit, offset, include_deleted.
  • Advance offset from .next.

Patch for this hunk + signature, wdyt?

-    result: list[models.SourceResponse] = []
-    has_more = True
-    offset, page_size = 0, 100
-    while has_more:
-        response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
-            api.ListSourcesRequest(
-                workspace_ids=[workspace_id],
-                offset=offset,
-                limit=page_size,
-            ),
-        )
-        has_more = bool(response.sources_response and response.sources_response.next)
-        offset += page_size
-
-        if not status_ok(response.status_code) and response.sources_response:
-            raise AirbyteError(
-                context={
-                    "workspace_id": workspace_id,
-                    "response": response,
-                }
-            )
-        assert response.sources_response is not None
-        result += [source for source in response.sources_response.data if name_filter(source.name)]
-
-    return result
+    result: list[models.SourceResponse] = []
+    page_size = 100
+    current_offset = offset or 0
+    remaining = None if limit is None else int(limit)
+    has_more = True
+    while has_more and (remaining is None or remaining > 0):
+        request_limit = page_size if remaining is None else max(1, min(page_size, remaining))
+        response: api.ListSourcesResponse = airbyte_instance.sources.list_sources(
+            api.ListSourcesRequest(
+                workspace_ids=[workspace_id],
+                offset=current_offset,
+                limit=request_limit,
+                include_deleted=bool(include_deleted),
+            ),
+        )
+        if not status_ok(response.status_code) or response.sources_response is None:
+            raise AirbyteError(
+                context={
+                    "workspace_id": workspace_id,
+                    "response": response,
+                }
+            )
+        page_data = response.sources_response.data
+        filtered = [s for s in page_data if name_filter(s.name)]
+        result += filtered
+        if remaining is not None:
+            remaining -= len(filtered)
+        next_token = getattr(response.sources_response, "next", None)
+        has_more = next_token is not None
+        try:
+            current_offset = int(next_token) if next_token is not None else current_offset + request_limit
+        except (TypeError, ValueError):
+            current_offset += request_limit
+
+    return result

Signature + guards (outside this hunk):

 def list_sources(
     workspace_id: str,
     *,
     api_root: str,
     client_id: SecretString,
     client_secret: SecretString,
     name: str | None = None,
-    name_filter: Callable[[str], bool] | None = None,
+    name_filter: Callable[[str], bool] | None = None,
+    limit: int | None = 20,
+    offset: int | None = 0,
+    include_deleted: bool | None = False,
 ) -> list[models.SourceResponse]:
@@
-    name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)
+    name_filter = (lambda n: n == name) if name else name_filter or (lambda _: True)
+    if (limit is not None and limit < 0) or (offset is not None and offset < 0):
+        raise PyAirbyteInputError(message="`limit` and `offset` must be non-negative.")
🧹 Nitpick comments (1)
airbyte/cloud/connectors.py (1)

104-119: Minor: docstring wording nit.

This is connector info, not “connection” info. Want to tighten the docstring, wdyt?

-        """The connection info object. (Cached.)"""
+        """The connector info object. (Cached.)"""
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 55c06bf and 31c8f15.

📒 Files selected for processing (4)
  • airbyte/_util/api_util.py (5 hunks)
  • airbyte/cloud/connections.py (1 hunks)
  • airbyte/cloud/connectors.py (4 hunks)
  • airbyte/cloud/workspaces.py (3 hunks)
✅ Files skipped from review due to trivial changes (1)
  • airbyte/cloud/workspaces.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
PR: airbytehq/PyAirbyte#285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.

Applied to files:

  • airbyte/_util/api_util.py
🧬 Code graph analysis (3)
airbyte/cloud/connections.py (1)
airbyte/cloud/connectors.py (3)
  • source_id (183-188)
  • destination_id (224-229)
  • name (110-118)
airbyte/cloud/connectors.py (2)
airbyte/_util/api_util.py (2)
  • get_source (505-530)
  • get_destination (624-667)
airbyte/cloud/workspaces.py (2)
  • get_source (112-124)
  • get_destination (126-138)
airbyte/_util/api_util.py (2)
airbyte/cloud/workspaces.py (3)
  • list_connections (373-398)
  • list_sources (400-425)
  • list_destinations (427-452)
airbyte/exceptions.py (1)
  • AirbyteError (432-447)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
🔇 Additional comments (8)
airbyte/cloud/connections.py (2)

67-82: Good addition: internal factory caches full response.

The _from_connection_response helper looks correct and the cache assignment avoids an immediate follow-up fetch, nice. LGTM, wdyt?


85-95: Name property lazy-load pattern is sound.

The lazy fetch + cache aligns with connectors; implementation looks good. LGTM, wdyt?

airbyte/cloud/connectors.py (5)

120-124: Abstract fetch hook is clear and extensible.

The _fetch_connector_info abstraction is well-placed; concrete impls below satisfy it. LGTM, wdyt?


190-198: Source info fetch: LGTM.

Directly delegates to api_util.get_source; consistent with lazy name access. Looks good, wdyt?


199-215: Internal factory for CloudSource: LGTM.

Caching _connector_info from API response avoids extra roundtrips. Looks good to me, wdyt?


231-239: Destination info fetch: LGTM.

Mirrors source pattern and leverages existing workaround in api_util.get_destination. Looks good, wdyt?


240-256: Internal factory for CloudDestination: LGTM.

Symmetric with source factory and caching pattern. All good, wdyt?

airbyte/_util/api_util.py (1)

409-409: Bumping get_job_logs default to 100 — LGTM

Aligns better with recent jobs UX. Do you want to update public docs/examples that reference the old default of 20, wdyt?

I tried to locate callers but ripgrep returned "No files were searched". Can you re-run a full repo search locally to confirm no callers rely on the old default? Example command to run locally:
rg -n --hidden --no-ignore -C2 '\bget_job_logs\s*('

@aaronsteers aaronsteers changed the title feat: add pagination support to CloudWorkspace list methods feat: add pagination support and name attribute to CloudWorkspace list methods for CloudConnection, CloudSource, and CloudDestination Sep 20, 2025
@aaronsteers aaronsteers changed the title feat: add pagination support and name attribute to CloudWorkspace list methods for CloudConnection, CloudSource, and CloudDestination feat: add pagination support and name attribute to cloud objects: CloudConnection, CloudSource, and CloudDestination Sep 20, 2025
@aaronsteers aaronsteers changed the title feat: add pagination support and name attribute to cloud objects: CloudConnection, CloudSource, and CloudDestination feat: add pagination support and name attribute for cloud objects: CloudConnection, CloudSource, and CloudDestination Sep 20, 2025
@aaronsteers aaronsteers enabled auto-merge (squash) September 20, 2025 02:29
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
tests/integration_tests/cloud/test_cloud_workspaces.py (2)

41-44: Reduce flakiness from fixed name + unique=False by appending an ephemeral suffix.

Hardcoding the name with unique=False can collide across parallel CI runs or leftover resources. Can we still exercise the non-unique path while minimizing collisions by adding a short run-scoped suffix, wdyt?

Apply within this call:

-        name="test-faker-source-deleteme",
+        name=f"test-faker-source-deleteme-{os.getenv('GITHUB_RUN_ID', uuid4().hex[:8])}",
         source=source,
         unique=False,

Add once at the top of the file:

import os
from uuid import uuid4

Optional: ensure cleanup on failures and assert the name round-trips:

cloud_source: CloudSource | None = None
try:
    cloud_source = cloud_workspace.deploy_source(
        name=f"test-faker-source-deleteme-{os.getenv('GITHUB_RUN_ID', uuid4().hex[:8])}",
        source=source,
        unique=False,
    )
    assert cloud_source.name.startswith("test-faker-source-deleteme")
finally:
    if cloud_source:
        cloud_workspace.permanently_delete_source(cloud_source)

56-59: Mirror the same collision-avoidance for the dummy source case.

Same concern as above: fixed name + unique=False risks flaky clashes. Append a short, deterministic suffix, wdyt?

Apply within this call:

-        name="test-source-deleteme",
+        name=f"test-source-deleteme-{os.getenv('GITHUB_RUN_ID', uuid4().hex[:8])}",
         source=deployable_dummy_source,
         unique=False,

Optional assertion and try/finally for robust cleanup:

cloud_source: CloudSource | None = None
try:
    cloud_source = cloud_workspace.deploy_source(
        name=f"test-source-deleteme-{os.getenv('GITHUB_RUN_ID', uuid4().hex[:8])}",
        source=deployable_dummy_source,
        unique=False,
    )
    assert cloud_source.name.startswith("test-source-deleteme")
finally:
    if cloud_source:
        cloud_workspace.permanently_delete_source(cloud_source)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 31c8f15 and daa496a.

📒 Files selected for processing (1)
  • tests/integration_tests/cloud/test_cloud_workspaces.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/integration_tests/cloud/test_cloud_workspaces.py (1)
tests/integration_tests/cloud/conftest.py (1)
  • deployable_dummy_source (106-119)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)

@aaronsteers aaronsteers merged commit 02ee440 into main Sep 20, 2025
21 checks passed
@aaronsteers aaronsteers deleted the devin/1757556687-add-pagination-to-cloud-workspace branch September 20, 2025 03:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant