-
Notifications
You must be signed in to change notification settings - Fork 67
feat(mcp): add Airbyte Cloud MCP tools: Deploy, Sync, and List #743
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add deploy_source_to_cloud tool for deploying sources to Airbyte Cloud - Add deploy_destination_to_cloud tool for deploying destinations to Airbyte Cloud - Add create_connection_on_cloud tool for creating connections between deployed sources and destinations - Add run_cloud_sync tool for executing sync jobs on cloud connections - All tools follow existing MCP patterns and use AIRBYTE_CLIENT_ID/AIRBYTE_CLIENT_SECRET for authentication - Tools are properly registered in register_cloud_ops_tools function Co-Authored-By: AJ Steers <[email protected]>
Original prompt from AJ Steers
|
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1754506844-add-cloud-mcp-tools' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1754506844-add-cloud-mcp-tools' Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds four new MCP tools to PyAirbyte for comprehensive Airbyte Cloud operations, enabling users to deploy sources/destinations, create connections, and run syncs through the Model Context Protocol interface.
- Adds
deploy_source_to_cloud
anddeploy_destination_to_cloud
tools for deploying connectors - Adds
create_connection_on_cloud
tool for linking deployed connectors - Adds
run_cloud_sync
tool for executing sync jobs with optional wait functionality
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughIntroduces an authenticated Airbyte Cloud workspace layer and replaces secrets-based Cloud ops with environment-driven resolvers; adds new MCP cloud operations (deploy, list, connect, run sync), centralizes cloud auth and config resolution, expands secrets management (dotenv, try_get_secret), and updates web-URL construction and representations. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor User
participant MCP as MCP Tools
participant Auth as Cloud Auth Resolvers
participant WS as CloudWorkspace
participant API as Airbyte Cloud API
rect rgba(227,242,253,0.5)
note right of MCP: Deploy source/destination
User->>MCP: deploy_source_to_cloud(name, connector, config?)
MCP->>Auth: resolve_cloud_workspace_id/client_id/secret/api_url
Auth-->>MCP: workspace creds + api_root
MCP->>WS: build workspace (credentials)
MCP->>WS: prepare connector config (resolve_config, get_source/get_destination)
WS->>API: POST /sources or /destination (definition, config)
API-->>WS: Source/Destination {id, url}
WS-->>MCP: CloudSource/CloudDestination
MCP-->>User: Deployed message (IDs / URLs)
end
rect rgba(232,245,233,0.5)
note right of MCP: Create connection & run sync
User->>MCP: create_connection_on_cloud(name, source_id, dest_id, streams)
MCP->>WS: create connection
WS->>API: POST /connections
API-->>WS: Connection{id,url}
MCP->>WS: run_cloud_sync(connection_id)
WS->>API: POST /connections/{id}/sync
API-->>WS: Job{id,status}
alt wait=true
WS->>API: poll job status
API-->>WS: final status
WS-->>MCP: final status + URLs
else wait=false
WS-->>MCP: started job info
end
MCP-->>User: Status/URLs
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Would you like a focused follow-up diff check on the new get_secret default handling to verify it doesn't short-circuit multi-source resolution unexpectedly, wdyt? Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal). Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte/mcp/_cloud_ops.py (2)
16-79
: Strong implementation of deploy_source_to_cloud function!The function follows excellent patterns with proper error handling, authentication, and return messaging. I particularly like how it leverages the existing
resolve_config
utility and follows the try/except/else pattern consistently.One small question about the config resolution - should we consider validating that at least one of
config
orconfig_secret_name
is provided, wdyt? Currently it would proceed withNone
config if both are omitted, which might lead to less clear error messages downstream.def deploy_source_to_cloud( # ... parameters ... ) -> str: """Deploy a source connector to Airbyte Cloud.""" + if config is None and config_secret_name is None: + return "Failed to deploy source: Either 'config' or 'config_secret_name' must be provided." + try: source = get_source(source_connector_name)
133-190
: Great job on the create_connection_on_cloud function!The function signature and implementation look solid. The parameter documentation is clear and the error handling follows the established pattern perfectly.
Quick question about the
api_root
parameter on line 154 - I notice it's missing the*
marker before it, unlike the other functions where keyword-only parameters are clearly delineated. Should this be moved after the*
to match the pattern, wdyt?selected_streams: Annotated[ list[str], Field(description="The selected stream names to sync within the connection."), ], + *, api_root: Annotated[ str | None, Field(description="Optional Cloud API root URL override."), ] = None, table_prefix: Annotated[ str | None, Field(description="Optional table prefix to use when syncing to the destination."), ] = None,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/mcp/_cloud_ops.py
(2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: yohannj
PR: airbytehq/PyAirbyte#716
File: airbyte/logs.py:384-402
Timestamp: 2025-07-11T19:53:44.427Z
Learning: In the PyAirbyte project, when reviewing PRs, maintain clear separation of concerns. Don't suggest changes that are outside the scope of the PR's main objective, even if they would improve consistency or fix other issues. This helps with reviewing changes and potential reverts.
📚 Learning: in pyairbyte, error messages in functions like `_resolve_source_job` in `airbyte/cli.py` are designe...
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/cli.py:111-160
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In PyAirbyte, error messages in functions like `_resolve_source_job` in `airbyte/cli.py` are designed to decouple the message text from dynamic values, following a structlog-inspired design. Dynamic values are provided via parameters like `input_value`. This approach helps avoid including PII in the message strings, which may be used in telemetry.
Applied to files:
airbyte/mcp/_cloud_ops.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (4)
airbyte/mcp/_cloud_ops.py (4)
4-4
: LGTM on the new imports!The new imports are well-organized and necessary for the added functionality. The
Path
import supports flexible config handling, and the additional airbyte imports enable the cloud operations functionality.Also applies to: 10-10, 13-13
82-130
: Nice consistency in the deploy_destination_to_cloud function!The implementation follows the same excellent patterns as the source deployment function. The parameter structure and error handling are consistent and well-designed.
I notice this function takes
destination_config
as a requireddict
parameter, while the source function has more flexible config resolution. Is this intentional difference based on how destinations vs sources are typically configured, wdyt? Just want to make sure the API design aligns with expected usage patterns.
193-238
: Excellent implementation of run_cloud_sync function!The sync execution logic is clean and the conditional return messaging based on the
wait
parameter is well thought out. The timeout handling and job status reporting look great.
289-292
: Perfect tool registration additions!All four new functions are properly registered alongside the existing
get_cloud_sync_status
tool. This will make them discoverable through the MCP interface as intended.
- Add config_secret_name parameter to deploy_destination_to_cloud function - Make destination_config optional and support Path type like other MCP tools - Use resolve_config to handle merging config with secrets from secret names - Prevents secret values from passing through LLM by using secret name references - Follows same pattern as validate_connector_config and other existing MCP tools Co-Authored-By: AJ Steers <[email protected]>
…_source_to_cloud - Add destination_connector_name parameter following same pattern as source deployment - Use get_destination() and set_config() pattern like deploy_source_to_cloud - Fix parameter name inconsistency by passing destination object instead of config dict - Ensure API consistency across all cloud deployment MCP tools - Addresses Copilot feedback on PR #743 Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
airbyte/mcp/_cloud_ops.py (1)
149-206
: Should the parameter ordering be consistent with the workspace.deploy_connection method?Looking at the function signature, I notice that
api_root
comes beforetable_prefix
in the parameters, but based on typical Python conventions, optional parameters with defaults usually come after required ones. Also, shouldapi_root
be a keyword-only parameter like in the other functions for consistency? wdyt?selected_streams: Annotated[ list[str], Field(description="The selected stream names to sync within the connection."), ], + *, api_root: Annotated[ str | None, Field(description="Optional Cloud API root URL override."), ] = None, table_prefix: Annotated[ str | None, Field(description="Optional table prefix to use when syncing to the destination."), ] = None,
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/mcp/_cloud_ops.py
(2 hunks)
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: yohannj
PR: airbytehq/PyAirbyte#716
File: airbyte/logs.py:384-402
Timestamp: 2025-07-11T19:53:44.427Z
Learning: In the PyAirbyte project, when reviewing PRs, maintain clear separation of concerns. Don't suggest changes that are outside the scope of the PR's main objective, even if they would improve consistency or fix other issues. This helps with reviewing changes and potential reverts.
📚 Learning: in the `validate` function, when calling `_resolve_source_job` and `_resolve_destination_job`, the `...
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#416
File: airbyte/cli.py:337-343
Timestamp: 2024-10-09T23:05:35.714Z
Learning: In the `validate` function, when calling `_resolve_source_job` and `_resolve_destination_job`, the `config` parameter is intentionally set to `None` because configuration is processed in a separate step later.
Applied to files:
airbyte/mcp/_cloud_ops.py
📚 Learning: in pyairbyte, error messages in functions like `_resolve_source_job` in `airbyte/cli.py` are designe...
Learnt from: aaronsteers
PR: airbytehq/PyAirbyte#411
File: airbyte/cli.py:111-160
Timestamp: 2024-10-08T15:34:31.026Z
Learning: In PyAirbyte, error messages in functions like `_resolve_source_job` in `airbyte/cli.py` are designed to decouple the message text from dynamic values, following a structlog-inspired design. Dynamic values are provided via parameters like `input_value`. This approach helps avoid including PII in the message strings, which may be used in telemetry.
Applied to files:
airbyte/mcp/_cloud_ops.py
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (7)
airbyte/mcp/_cloud_ops.py (7)
4-4
: LGTM on the new import!The
Path
import is correctly added to support thedict | Path | None
type annotations in the config parameters.
10-10
: Good addition of required imports!The imports for
get_destination
,get_source
, andsecrets
are appropriately added to support the new functionality.
13-13
: Nice utility import!Adding
resolve_config
import to handle configuration resolution from different sources (dict, Path, or secrets) is a good abstraction.
16-79
: Strong implementation of deploy_source_to_cloud!The function follows the established pattern well with proper error handling, authentication, and descriptive messages. The parameter signature is consistent and well-documented.
82-146
: Excellent consistency in deploy_destination_to_cloud!The function mirrors the source deployment pattern perfectly, maintaining API consistency. Good use of the same parameter structure and error handling approach.
209-254
: Great implementation of run_cloud_sync with flexible waiting!The function handles both synchronous and asynchronous execution patterns well. The conditional return message based on the
wait
parameter is a nice touch for user feedback.
302-308
: Perfect tool registration!All four new functions are properly registered as MCP tools, maintaining consistency with the existing pattern.
- Remove workspace_id and api_root parameters from all cloud MCP tools - Configure tools to use AIRBYTE_WORKSPACE_ID and AIRBYTE_API_ROOT env vars - Add check_airbyte_cloud_workspace tool for connection validation - Update docstrings to reflect new environment variable requirements - Addresses GitHub feedback from aaronsteers on PR #743 Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
airbyte/mcp/_cloud_ops.py (1)
120-124
: Destination parameter now correctly passes a Destination objectThis addresses the earlier review about passing destination vs destination_config. Looks good to me.
Great to see the consistency with the source deployment pattern, wdyt?
🧹 Nitpick comments (2)
airbyte/mcp/_cloud_ops.py (2)
171-177
: Early validation for selected_streams for clearer UXWorkspace.deploy_connection will raise if selected_streams is empty. Would you like to pre-validate and return a concise error before calling the API?
) -> str: """Create a connection between a deployed source and destination on Airbyte Cloud. @@ - try: + if not selected_streams: + return f"Failed to create connection '{connection_name}': selected_streams cannot be empty" + try: workspace = cloud.CloudWorkspace(wdyt?
198-201
: Constrain wait_timeout to positive valuesTo avoid accidental non-positive timeouts, shall we add a lower bound?
wait_timeout: Annotated[ int, - Field(description="Maximum time to wait for sync completion (seconds)."), + Field(description="Maximum time to wait for sync completion (seconds).", ge=1), ] = 300,wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/mcp/_cloud_ops.py
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/mcp/_cloud_ops.py (5)
airbyte/cloud/workspaces.py (7)
CloudWorkspace
(56-444)deploy_source
(141-185)deploy_destination
(187-240)deploy_connection
(290-338)get_connection
(97-109)connect
(80-93)workspace_url
(74-76)airbyte/mcp/_util.py (1)
resolve_config
(46-106)airbyte/cloud/connections.py (4)
source
(81-90)destination
(104-113)table_prefix
(124-129)run_sync
(143-170)airbyte/secrets/util.py (1)
get_secret
(32-109)airbyte/cloud/sync_results.py (1)
get_job_status
(179-181)
🪛 GitHub Actions: Run Linters
airbyte/mcp/_cloud_ops.py
[error] 48-48: Argument 'config' to 'resolve_config' has incompatible type 'dict[Any, Any] | Path | None'; expected 'dict[Any, Any] | str | None' [arg-type]
[error] 107-107: Argument 'config' to 'resolve_config' has incompatible type 'dict[Any, Any] | Path | None'; expected 'dict[Any, Any] | str | None' [arg-type]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (1)
airbyte/mcp/_cloud_ops.py (1)
292-297
: Tool registration looks completeRegistration order and coverage (workspace check + four ops + status) look good.
Nice expansion of the MCP surface area, wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (3)
airbyte/mcp/_cloud_ops.py (3)
22-29
: Good: single helper to resolve authenticated CloudWorkspaceUsing the resolve_cloud_* helpers + a single _get_cloud_workspace() keeps the tools consistent and matches prior feedback about optional API root. LGTM.
63-68
: Fix mypy: coerce Path to str before resolve_configLinters fail because resolve_config expects dict | str | None but may receive Path. Coercing Path to str here should unblock. Wdyt?
- config_dict = resolve_config( - config=config, + config_in = str(config) if isinstance(config, Path) else config + config_dict = resolve_config( + config=config_in, config_secret_name=config_secret_name, config_spec_jsonschema=source.config_spec, )
116-121
: Apply the same Path→str coercion for destination configMirror the source-side fix to resolve mypy error here as well, wdyt?
- config_dict = resolve_config( - config=config, + config_in = str(config) if isinstance(config, Path) else config + config_dict = resolve_config( + config=config_in, config_secret_name=config_secret_name, config_spec_jsonschema=destination.config_spec, )
🧹 Nitpick comments (6)
airbyte/secrets/util.py (1)
143-145
: Consider validating the default value type consistency.The function now returns
SecretString(default)
when a default is provided and the secret isn't found. Should we add a type check to ensuredefault
is either already aSecretString
or astr
to avoid unexpected behavior? This would make the API more predictable. Wdyt?if default: + if not isinstance(default, (str, SecretString)): + raise exc.PyAirbyteInputError( + guidance="Default value must be a string or SecretString.", + input_value=default, + ) return SecretString(default)airbyte/mcp/_cloud_ops.py (5)
42-45
: Broaden tool input typing to accept file path stringsSince resolve_config accepts str paths, would you include str in the MCP tool param type for config to improve UX (without breaking dict inputs), wdyt?
- config: Annotated[ - dict | Path | None, + config: Annotated[ + dict | str | Path | None, Field(description="The configuration for the source connector."), ] = None,Apply similarly in deploy_destination_to_cloud.
Also applies to: 95-98
153-155
: Add basic validation constraints to inputsShall we enforce selected_streams non-empty at the schema level and ensure wait_timeout is positive to catch errors earlier, wdyt?
- selected_streams: Annotated[ - list[str], - Field(description="The selected stream names to sync within the connection."), - ], + selected_streams: Annotated[ + list[str], + Field(description="The selected stream names to sync within the connection.", min_items=1), + ], @@ - wait_timeout: Annotated[ - int, - Field(description="Maximum time to wait for sync completion (seconds)."), - ] = 300, + wait_timeout: Annotated[ + int, + Field(description="Maximum time to wait for sync completion (seconds).", ge=1), + ] = 300,Also applies to: 197-199
77-83
: Consider structured returns for MCP clients (ids, status, urls)Returning plain strings makes chaining harder. Would you return structured JSON with ids and status so clients can use values directly, wdyt?
- return ( - f"Successfully deployed source '{source_name}' with ID: {deployed_source.connector_id}" - ) + return { + "op": "deploy_source", + "status": "success", + "name": source_name, + "source_id": deployed_source.connector_id, + } @@ - return ( - f"Successfully deployed destination '{destination_name}' " - f"with ID: {deployed_destination.connector_id}" - ) + return { + "op": "deploy_destination", + "status": "success", + "name": destination_name, + "destination_id": deployed_destination.connector_id, + } @@ - return ( - f"Successfully created connection '{connection_name}' " - f"with ID: {deployed_connection.connection_id}" - ) + return { + "op": "create_connection", + "status": "success", + "name": connection_name, + "connection_id": deployed_connection.connection_id, + } @@ - return f"Sync completed with status: {status}. Job ID: {sync_result.job_id}" - return f"Sync started. Job ID: {sync_result.job_id}" + return {"op": "run_sync", "status": str(status), "job_id": sync_result.job_id} + return {"op": "run_sync", "status": "started", "job_id": sync_result.job_id} @@ - return ( - f"✅ Successfully connected to Airbyte Cloud workspace.\n" - f"Workspace ID: {workspace.workspace_id}\n" - f"Workspace URL: {workspace.workspace_url}" - ) + return { + "op": "check_workspace", + "status": "success", + "workspace_id": workspace.workspace_id, + "workspace_api_url": workspace.workspace_url, + }Similarly, you could return a structured error payload in the except blocks.
Also applies to: 130-136, 177-183, 215-219, 237-241
77-79
: Avoid leaking sensitive details in error messagesException messages during config/validation might include sensitive fields. Would you return a generic message to the client and log details server-side (or sanitize via masking), wdyt?
- except Exception as ex: - return f"Failed to deploy source '{source_name}': {ex}" + except Exception as ex: + # Consider logging `ex` with masking and returning a generic error to the client. + return f"Failed to deploy source '{source_name}'. See server logs for details."(Apply similarly to other except blocks.)
Also applies to: 130-131, 177-178, 212-214, 234-235
237-241
: Optional: include the UI URL for quicker manual verificationworkspace_url points at an API endpoint. Would you also include the UI URL to help humans click through, wdyt?
- return ( - f"✅ Successfully connected to Airbyte Cloud workspace.\n" - f"Workspace ID: {workspace.workspace_id}\n" - f"Workspace URL: {workspace.workspace_url}" - ) + cloud_ui_url = f"https://cloud.airbyte.com/workspaces/{workspace.workspace_id}" + return ( + f"✅ Successfully connected to Airbyte Cloud workspace.\n" + f"Workspace ID: {workspace.workspace_id}\n" + f"Workspace API URL: {workspace.workspace_url}\n" + f"Workspace UI URL: {cloud_ui_url}" + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
airbyte/_util/api_util.py
(1 hunks)airbyte/cloud/auth.py
(1 hunks)airbyte/mcp/_cloud_ops.py
(3 hunks)airbyte/secrets/util.py
(4 hunks)tests/integration_tests/cloud/test_cloud_api_util.py
(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (4)
airbyte/cloud/auth.py (2)
airbyte/secrets/base.py (1)
SecretString
(34-139)airbyte/secrets/util.py (2)
get_secret
(63-149)try_get_secret
(33-60)
airbyte/secrets/util.py (6)
airbyte/secrets/base.py (4)
SecretString
(34-139)SecretManager
(142-204)SecretSourceEnum
(23-31)get_secret
(174-180)airbyte/exceptions.py (1)
PyAirbyteSecretNotFoundError
(416-425)airbyte/secrets/google_gsm.py (1)
get_secret
(164-170)airbyte/secrets/prompt.py (1)
get_secret
(17-28)airbyte/secrets/google_colab.py (1)
get_secret
(27-37)airbyte/secrets/env_vars.py (2)
get_secret
(18-23)get_secret
(31-43)
tests/integration_tests/cloud/test_cloud_api_util.py (4)
airbyte/exceptions.py (1)
AirbyteError
(432-447)airbyte/_util/api_util.py (2)
check_connector
(806-847)get_bearer_token
(738-763)airbyte/secrets/base.py (1)
SecretString
(34-139)tests/integration_tests/cloud/conftest.py (2)
airbyte_cloud_client_id
(72-75)airbyte_cloud_client_secret
(79-82)
airbyte/mcp/_cloud_ops.py (7)
airbyte/cloud/workspaces.py (8)
get_destination
(125-137)get_source
(111-123)deploy_source
(141-185)deploy_destination
(187-240)deploy_connection
(290-338)get_connection
(97-109)connect
(80-93)workspace_url
(74-76)airbyte/cloud/auth.py (4)
resolve_cloud_api_url
(53-58)resolve_cloud_client_id
(45-50)resolve_cloud_client_secret
(37-42)resolve_cloud_workspace_id
(61-66)airbyte/mcp/_util.py (1)
resolve_config
(46-106)airbyte/cloud/connections.py (4)
source
(81-90)destination
(104-113)table_prefix
(124-129)run_sync
(143-170)airbyte/_connector_base.py (3)
config_spec
(236-245)set_config
(114-130)name
(84-86)airbyte/sources/base.py (2)
config_spec
(357-366)set_config
(281-301)airbyte/cloud/sync_results.py (1)
get_job_status
(179-181)
🪛 GitHub Actions: Run Linters
airbyte/mcp/_cloud_ops.py
[error] 64-64: mypy: Argument 'config' to 'resolve_config' has incompatible type 'dict[Any, Any] | Path | None'; expected 'dict[Any, Any] | str | None' [arg-type]
[error] 117-117: mypy: Argument 'config' to 'resolve_config' has incompatible type 'dict[Any, Any] | Path | None'; expected 'dict[Any, Any] | str | None' [arg-type]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
🔇 Additional comments (6)
airbyte/secrets/util.py (1)
33-61
: LGTM! Clean implementation of the try_get_secret helper.The new
try_get_secret
function provides a nice convenience wrapper that gracefully handles missing secrets. The implementation correctly suppressesPyAirbyteSecretNotFoundError
and delegates toget_secret
withallow_prompt=False
.airbyte/_util/api_util.py (1)
23-23
: Good centralization of cloud constants!Moving
CLOUD_API_ROOT
andCLOUD_CONFIG_API_ROOT
to a dedicatedairbyte.cloud.auth
module improves maintainability and creates a single source of truth for cloud-related configuration.tests/integration_tests/cloud/test_cloud_api_util.py (2)
9-27
: Clean import reorganization.The imports are now better organized with the addition of the model types from
airbyte_api.models
. The centralized import ofCLOUD_API_ROOT
from the new auth module aligns well with the architectural changes.
250-253
: Good addition of type hints for test parameters.Adding
SecretString
type annotations toairbyte_cloud_client_id
andairbyte_cloud_client_secret
improves test clarity and ensures type safety. This aligns with the typing used throughout the codebase.airbyte/cloud/auth.py (1)
8-21
: Nice centralization of Cloud API constantsConsolidating API roots and env-var names here simplifies maintenance and avoids drift across modules. LGTM.
Also applies to: 28-34
airbyte/mcp/_cloud_ops.py (1)
271-279
: Tool registration looks completeAll new tools are registered and discoverable via FastMCP. LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
airbyte/cloud/connections.py (1)
93-101
: destination_id is assigned from source_id (broken link to destination).Line 99 sets self._destination_id = self._connection_info.source_id; this should be destination_id. This breaks destination lookups and URLs, wdyt?
Apply this diff:
- self._destination_id = self._connection_info.source_id + self._destination_id = self._connection_info.destination_idairbyte/secrets/env_vars.py (1)
58-62
: Handle None-valued keys from dotenv safelyIf a key exists but its value is None (e.g., VAR= with no value), returning SecretString(None) will be problematic. Shall we guard for None and return None instead, wdyt?
- if secret_name not in dotenv_vars: - # Secret not found - return None - - return SecretString(dotenv_vars[secret_name]) + value = dotenv_vars.get(secret_name) + if value is None: + # Secret not found or unset + return None + return SecretString(value)airbyte/mcp/_util.py (1)
53-58
: Broaden resolve_config signature to accept str paths (fixes mypy and improves ergonomics)Linters expect dict | str | None; at runtime accepting str is also reasonable. Shall we accept str and coerce to Path internally, wdyt?
-def resolve_config( - config: dict | Path | None = None, +def resolve_config( + config: dict | str | Path | None = None, @@ - if isinstance(config, Path): - config_dict.update(yaml.safe_load(config.read_text())) + if isinstance(config, (str, Path)): + path = Path(config) + config_dict.update(yaml.safe_load(path.read_text()))Also applies to: 68-72
♻️ Duplicate comments (3)
airbyte/mcp/_cloud_ops.py (3)
57-60
: Align docstrings with CLOUD_ resolvers (and legacy fallbacks)*Docs mention AIRBYTE_* but the code resolves CLOUD_* via airbyte.cloud.auth (with legacy fallbacks). Shall we update the docstrings to reflect that to prevent confusion, wdyt?
- By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, - and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the - Airbyte Cloud API. + Credentials are resolved via airbyte.cloud.auth resolvers: + AIRBYTE_CLOUD_CLIENT_ID / AIRBYTE_CLOUD_CLIENT_SECRET / AIRBYTE_CLOUD_WORKSPACE_ID / + AIRBYTE_CLOUD_API_URL (falling back to legacy AIRBYTE_* vars if present).(Apply to all tool docstrings above.)
Also applies to: 111-114, 164-167, 205-208, 234-237, 269-272
63-69
: Fix mypy Path typing by coercing to str before resolve_configLinters expect dict | str | None. Coercing Path → str here unblocks CI without changing behavior. wdyt?
- config_dict = resolve_config( - config=config, + config_in = str(config) if isinstance(config, Path) else config + config_dict = resolve_config( + config=config_in, config_secret_name=config_secret_name, config_spec_jsonschema=source.config_spec, )
117-123
: Apply the same Path→str fix for destination configsMirror the source-side change to satisfy mypy. wdyt?
- config_dict = resolve_config( - config=config, + config_in = str(config) if isinstance(config, Path) else config + config_dict = resolve_config( + config=config_in, config_secret_name=config_secret_name, config_spec_jsonschema=destination.config_spec, )
🧹 Nitpick comments (7)
airbyte/secrets/config.py (1)
45-61
: Broadened type to SecretManager looks good; avoid duplicates when registering?Accepting any SecretManager is the right move. Do we also want to de-dup by name before insert to prevent multiple identical managers being registered across sessions, wdyt?
Apply within register_secret_manager:
def register_secret_manager( secret_manager: SecretManager, *, as_backup: bool = False, replace_existing: bool = False, ) -> None: """Register a custom secret manager.""" if replace_existing: clear_secret_sources() + # Avoid duplicates by name/equality + global _SECRETS_SOURCES + _SECRETS_SOURCES = [s for s in _SECRETS_SOURCES if s != secret_manager] + if as_backup: # Add to end of list _SECRETS_SOURCES.append(secret_manager) else: # Add to beginning of list _SECRETS_SOURCES.insert(0, secret_manager)examples/run_bigquery_faker.py (1)
24-31
: Avoid type: ignore and fail fast if the secret is missing (still terse).Since get_secret can return None, an assert keeps the example simple and avoids mypy ignore and AttributeError at runtime. Shall we do this, wdyt?
-bigquery_destination_secret: dict = ( - GoogleGSMSecretManager( # type: ignore[union-attr] +mgr = GoogleGSMSecretManager( project=AIRBYTE_INTERNAL_GCP_PROJECT, credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"), - ) - .get_secret(SECRET_NAME) - .parse_json() -) +) +_secret = mgr.get_secret(SECRET_NAME) +assert _secret is not None, f"Secret '{SECRET_NAME}' not found in GSM" +bigquery_destination_secret: dict = _secret.parse_json()examples/run_bigquery_destination.py (1)
23-30
: Same as above: replace type ignore with a tiny assert for clarity.Keeps the demo short while being safer at runtime, wdyt?
-bigquery_destination_secret: dict = ( - GoogleGSMSecretManager( # type: ignore[union-attr] +mgr = GoogleGSMSecretManager( project=AIRBYTE_INTERNAL_GCP_PROJECT, credentials_json=ab.get_secret("GCP_GSM_CREDENTIALS"), - ) - .get_secret(SECRET_NAME) - .parse_json() -) +) +_secret = mgr.get_secret(SECRET_NAME) +assert _secret is not None, f"Secret '{SECRET_NAME}' not found in GSM" +bigquery_destination_secret: dict = _secret.parse_json()airbyte/secrets/env_vars.py (1)
51-53
: Optional: avoid re-parsing the .env file on every lookupdotenv_values() is called on every get_secret, which can be I/O-heavy for many lookups. Do you want to cache the parsed dict once per instance (e.g., lazy-init a private cache and invalidate only if needed), wdyt?
airbyte/cloud/workspaces.py (1)
75-78
: Tighten type: workspace_url always returns a strThe property never returns None. Shall we narrow the return type to str for clarity, wdyt?
- def workspace_url(self) -> str | None: + def workspace_url(self) -> str: """The web URL of the workspace.""" return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"airbyte/_util/api_util.py (1)
63-74
: Web URL root helper looks good; consider future override hookLGTM for Cloud. For self-managed, would you like to add an optional env/config override (e.g., AIRBYTE_WEB_URL_ROOT) as a stopgap until #563 is addressed, wdyt?
airbyte/mcp/_cloud_ops.py (1)
134-137
: Include destination URL in success message for parity with sourcesSources return both ID and URL; destinations only return ID. Shall we add connector_url for consistency, wdyt?
- return ( - f"Successfully deployed destination '{destination_name}' " - f"with ID: {deployed_destination.connector_id}" - ) + return ( + f"Successfully deployed destination '{destination_name}' " + f"with ID '{deployed_destination.connector_id}' and " + f"URL: {deployed_destination.connector_url}" + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (15)
airbyte/_util/api_util.py
(2 hunks)airbyte/cloud/auth.py
(1 hunks)airbyte/cloud/connections.py
(1 hunks)airbyte/cloud/connectors.py
(1 hunks)airbyte/cloud/sync_results.py
(1 hunks)airbyte/cloud/workspaces.py
(2 hunks)airbyte/constants.py
(1 hunks)airbyte/exceptions.py
(1 hunks)airbyte/mcp/_cloud_ops.py
(3 hunks)airbyte/mcp/_util.py
(2 hunks)airbyte/secrets/config.py
(1 hunks)airbyte/secrets/env_vars.py
(2 hunks)airbyte/secrets/google_gsm.py
(1 hunks)examples/run_bigquery_destination.py
(1 hunks)examples/run_bigquery_faker.py
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- airbyte/exceptions.py
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/cloud/auth.py
🧰 Additional context used
📓 Path-based instructions (1)
examples/*
⚙️ CodeRabbit configuration file
These scripts are intended for demonstration purposes only. They are not meant to represent production code. For these scripts, you should prefer brevity over robust error handling and handling of edge cases. These are demos - which should be as simple as possible to handle the 'blessed' use cases.
Files:
examples/run_bigquery_destination.py
examples/run_bigquery_faker.py
🧬 Code graph analysis (11)
airbyte/cloud/connections.py (2)
airbyte/cloud/workspaces.py (1)
workspace_url
(75-77)airbyte/exceptions.py (3)
workspace_url
(442-447)job_history_url
(472-477)connection_url
(464-469)
airbyte/cloud/workspaces.py (1)
airbyte/_util/api_util.py (1)
get_web_url_root
(63-73)
airbyte/cloud/sync_results.py (2)
airbyte/cloud/connections.py (1)
job_history_url
(137-139)airbyte/exceptions.py (1)
job_history_url
(472-477)
airbyte/secrets/google_gsm.py (5)
airbyte/secrets/env_vars.py (3)
get_secret
(23-28)get_secret
(48-62)name
(37-39)airbyte/secrets/util.py (1)
get_secret
(63-149)airbyte/secrets/base.py (2)
get_secret
(174-180)SecretString
(34-139)airbyte/secrets/google_colab.py (1)
get_secret
(27-37)airbyte/secrets/prompt.py (1)
get_secret
(17-28)
examples/run_bigquery_destination.py (1)
airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-304)
airbyte/secrets/config.py (1)
airbyte/secrets/base.py (1)
SecretManager
(142-204)
airbyte/cloud/connectors.py (2)
airbyte/cloud/workspaces.py (1)
workspace_url
(75-77)airbyte/exceptions.py (1)
workspace_url
(442-447)
examples/run_bigquery_faker.py (1)
airbyte/secrets/google_gsm.py (1)
GoogleGSMSecretManager
(75-304)
airbyte/mcp/_util.py (3)
airbyte/secrets/env_vars.py (3)
DotenvSecretManager
(31-62)get_secret
(23-28)get_secret
(48-62)airbyte/secrets/config.py (1)
register_secret_manager
(45-60)airbyte/secrets/util.py (1)
get_secret
(63-149)
airbyte/secrets/env_vars.py (2)
airbyte/secrets/base.py (4)
SecretManager
(142-204)SecretSourceEnum
(23-31)SecretString
(34-139)get_secret
(174-180)airbyte/secrets/util.py (1)
get_secret
(63-149)
airbyte/mcp/_cloud_ops.py (7)
airbyte/cloud/workspaces.py (9)
get_destination
(126-138)get_source
(112-124)CloudWorkspace
(57-445)deploy_source
(142-186)deploy_destination
(188-241)deploy_connection
(291-339)get_connection
(98-110)connect
(81-94)workspace_url
(75-77)airbyte/cloud/auth.py (4)
resolve_cloud_api_url
(25-33)resolve_cloud_client_id
(17-22)resolve_cloud_client_secret
(9-14)resolve_cloud_workspace_id
(36-41)airbyte/mcp/_util.py (1)
resolve_config
(53-113)airbyte/cloud/connections.py (7)
source
(81-90)destination
(104-113)source_id
(70-78)destination_id
(93-101)table_prefix
(124-129)connection_url
(132-134)run_sync
(143-170)airbyte/_connector_base.py (3)
config_spec
(236-245)set_config
(114-130)name
(84-86)airbyte/cloud/connectors.py (3)
connector_url
(103-105)source_id
(151-156)destination_id
(166-171)airbyte/cloud/sync_results.py (2)
get_job_status
(186-188)job_url
(146-155)
🪛 GitHub Actions: Run Linters
airbyte/mcp/_cloud_ops.py
[error] 64-64: mypy: Argument 'config' to 'resolve_config' has incompatible type 'dict[Any, Any] | Path | None'; expected 'dict[Any, Any] | str | None' [arg-type]
[error] 118-118: mypy: Argument 'config' to 'resolve_config' has incompatible type 'dict[Any, Any] | Path | None'; expected 'dict[Any, Any] | str | None' [arg-type]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (8)
airbyte/cloud/connectors.py (1)
102-106
: Connector URL uses singular path — keep as-is?airbyte/cloud/connectors.py returns f"{self.workspace.workspace_url}/{self.connector_type}/{self.connector_id}" (singular "source"/"destination"); workspace_url is /workspaces/{workspace_id}. Docs/API still use "/sources" and "/destinations" for docs/server endpoints. Keep singular, wdyt?
airbyte/cloud/sync_results.py (1)
145-156
: job_url now points to connection timeline — OK; ensure global consistency.This aligns with CloudConnection.job_history_url=/timeline. Let’s also update exceptions.job_history_url to /timeline for consistency (see related comment), wdyt?
airbyte/constants.py (1)
155-184
: Cloud constants look good and clarify configuration surface.LGTM. These names align with the new cloud auth resolvers and web URL handling.
airbyte/secrets/env_vars.py (1)
14-16
: TYPE_CHECKING import looks goodUsing TYPE_CHECKING to avoid runtime import overhead is appropriate here.
airbyte/cloud/workspaces.py (1)
43-43
: Import of get_web_url_root is appropriateThis isolates web vs API roots cleanly.
airbyte/_util/api_util.py (1)
23-23
: Good: centralize constants importRemoving local duplicates prevents drift.
airbyte/mcp/_cloud_ops.py (2)
22-29
: Workspace resolver helper is cleanCentralizing to resolve_cloud_* keeps tools consistent.
281-289
: Tool registration set looks completeAll new tools (plus status) are registered and discoverable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (3)
airbyte/mcp/_cloud_ops.py (3)
61-64
: Align env-var docstrings with CLOUD_ resolvers (or mention both)*The code authenticates via
resolve_cloud_*
(CLOUD_* envs with legacy fallbacks), but the docstrings only referenceAIRBYTE_*
. Could we update the text to match behavior to avoid confusion, wdyt?Example (apply similarly to each block listed):
- By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, - and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the - Airbyte Cloud API. + By default, credentials are resolved via airbyte.cloud.auth: + `AIRBYTE_CLOUD_CLIENT_ID`, `AIRBYTE_CLOUD_CLIENT_SECRET`, + `AIRBYTE_CLOUD_WORKSPACE_ID`, and `AIRBYTE_CLOUD_API_ROOT` + (falling back to legacy `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, + `AIRBYTE_WORKSPACE_ID`, `AIRBYTE_API_ROOT` if present).Also applies to: 116-119, 170-173, 212-215, 242-245, 270-273, 307-309, 323-326, 335-338, 347-350
66-73
: Fix mypy error: coerce Path -> str before resolve_configLinters are failing because
resolve_config
expectsdict | str | None
. Shall we coercePath
tostr
first, wdyt?- config_dict = resolve_config( - config=config, + config_in = str(config) if isinstance(config, Path) else config + config_dict = resolve_config( + config=config_in, config_secret_name=config_secret_name, config_spec_jsonschema=source.config_spec, )
121-127
: Mirror the Path -> str coercion for destination configSame issue here; applying the same fix will unblock the second linter error.
- config_dict = resolve_config( - config=config, + config_in = str(config) if isinstance(config, Path) else config + config_dict = resolve_config( + config=config_in, config_secret_name=config_secret_name, config_spec_jsonschema=destination.config_spec, )
🧹 Nitpick comments (3)
airbyte/cloud/connectors.py (1)
102-106
: Docstring wording + Optional[str] guard on workspace_urlThis property is used for both sources and destinations, so could we generalize the docstring and also assert
workspace_url
isn’t None to avoid Optional hazards and help mypy, wdyt?- def connector_url(self) -> str: - """Get the web URL of the source connector.""" - return f"{self.workspace.workspace_url}/{self.connector_type}/{self.connector_id}" + def connector_url(self) -> str: + """Get the web URL of the connector.""" + assert self.workspace.workspace_url, "workspace_url is not set" + return f"{self.workspace.workspace_url}/{self.connector_type}/{self.connector_id}"airbyte/mcp/_cloud_ops.py (2)
139-142
: Include destination URL for parity with source deploy messageThe source deploy path returns both ID and URL; shall we include the URL here too for consistency, wdyt?
- return ( - f"Successfully deployed destination '{destination_name}' " - f"with ID: {deployed_destination.connector_id}" - ) + return ( + f"Successfully deployed destination '{destination_name}' " + f"with ID '{deployed_destination.connector_id}' and " + f"URL: {deployed_destination.connector_url}" + )
227-235
: Clarify that job_url points to job history (not a specific job)
SyncResult.job_url
returns the connection’s job history URL (per sync_results.py). Shall we label it explicitly to avoid confusion, wdyt?- f"job URL is: {sync_result.job_url}" + f"job history URL is: {sync_result.job_url}"- f"job URL is: {sync_result.job_url}" + f"job history URL is: {sync_result.job_url}"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte/cloud/connections.py
(2 hunks)airbyte/cloud/connectors.py
(1 hunks)airbyte/cloud/workspaces.py
(5 hunks)airbyte/mcp/_cloud_ops.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- airbyte/cloud/connections.py
- airbyte/cloud/workspaces.py
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte/cloud/connectors.py (1)
airbyte/cloud/workspaces.py (1)
workspace_url
(75-77)
airbyte/mcp/_cloud_ops.py (7)
airbyte/cloud/workspaces.py (12)
get_destination
(126-138)get_source
(112-124)CloudWorkspace
(57-454)deploy_source
(142-186)deploy_destination
(188-241)deploy_connection
(291-339)get_connection
(98-110)connect
(81-94)workspace_url
(75-77)list_sources
(402-427)list_destinations
(429-454)list_connections
(373-400)airbyte/cloud/auth.py (4)
resolve_cloud_api_url
(25-33)resolve_cloud_client_id
(17-22)resolve_cloud_client_secret
(9-14)resolve_cloud_workspace_id
(36-41)airbyte/cloud/connections.py (8)
CloudConnection
(19-252)source
(81-90)destination
(104-113)source_id
(70-78)destination_id
(93-101)table_prefix
(124-129)run_sync
(143-170)get_sync_result
(205-230)airbyte/cloud/connectors.py (5)
CloudDestination
(168-180)CloudSource
(153-165)connector_url
(103-105)source_id
(160-165)destination_id
(175-180)airbyte/destinations/util.py (1)
get_noop_destination
(91-109)airbyte/mcp/_util.py (1)
resolve_config
(53-113)airbyte/cloud/sync_results.py (3)
get_job_status
(186-188)job_url
(146-155)SyncResult
(129-360)
🪛 GitHub Actions: Run Linters
airbyte/mcp/_cloud_ops.py
[error] 68-68: Argument 'config' to 'resolve_config' has incompatible type 'dict[Any, Any] | Path | None'; expected 'dict[Any, Any] | str | None' [arg-type]
[error] 123-123: Argument 'config' to 'resolve_config' has incompatible type 'dict[Any, Any] | Path | None'; expected 'dict[Any, Any] | str | None' [arg-type]
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (3)
airbyte/cloud/connectors.py (2)
107-114
: LGTM on reprReadable and helpful in logs. Nice touch including the URL.
102-106
: Confirm plural vs singular path used for cloud connector URLsScan shows plural "sources"/"destinations" used in docs/API, but connector_url uses self.connector_type and I couldn't find where that's set — can you verify CloudSource/CloudDestination set connector_type to "sources"/"destinations" (or test a live workspace URL to ensure links don't 404), wdyt?
airbyte/mcp/_cloud_ops.py (1)
355-366
: LGTM on tool registrationNice, consistent registration and ordering. Tools look discoverable via FastMCP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/_util/api_util.py
(3 hunks)
🧰 Additional context used
🪛 GitHub Actions: Run Linters
airbyte/_util/api_util.py
[error] 545-545: Ruff: FIX004 Line contains HACK, consider resolving the issue. Command 'poetry run ruff check .' exited with code 1.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (1)
airbyte/_util/api_util.py (1)
23-23
: Centralizing API root constants — looks goodImporting
CLOUD_API_ROOT
andCLOUD_CONFIG_API_ROOT
fromairbyte.constants
reduces duplication and keeps these values in one place. 👍
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
- Fix incorrect indentation of _load_dotenv_file() in airbyte/mcp/_util.py - Add missing blank line before __init__ method in airbyte/secrets/env_vars.py - Resolves syntax errors preventing module loading and mypy type checking Co-Authored-By: AJ Steers <[email protected]>
- Fix return type annotation in airbyte/types.py:145 using cast with quoted type - Fix assignment type annotation in airbyte/sources/registry.py:123 to allow None - Replace HACK comment with TODO and add issue link in airbyte/_util/api_util.py:545 Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte/sources/registry.py (2)
158-163
: Add an HTTP timeout to avoid indefinite hangs.Could we add a small timeout to the registry fetch so CLI/UI calls don’t block on bad networks, wdyt?
- response = requests.get( + response = requests.get( registry_url, headers={"User-Agent": f"PyAirbyte/{get_version()}"}, - ) + timeout=15, + )
121-125
: Warn on misconfigured PyPI entries.If
enabled=True
butpackageName
is missing, we silently exclude it later. Would a warning help surface registry issues, wdyt?pypi_registry: dict = remote_registries.get("pypi", {}) pypi_package_name: str | None = pypi_registry.get("packageName", None) pypi_enabled: bool = pypi_registry.get("enabled", False) + if pypi_enabled and not pypi_package_name: + warnings.warn( + message=f"PyPI is enabled but packageName is missing for connector {name}", + stacklevel=2, + )airbyte/types.py (1)
145-145
: Use the real type in cast (or type the map) to avoid checker quirks.The cast is fine at runtime, but some type checkers (e.g., pyright) can flag string-literal types passed to
cast
. Could we switch to the real type to maximize compatibility, or type the mapping to avoid the cast altogether, wdyt?Apply this minimal tweak:
- return cast("sqlalchemy.types.TypeEngine", sql_type) + return cast(sqlalchemy.types.TypeEngine, sql_type)Alternatively (outside this hunk), consider typing the conversion map so
cast
is unnecessary:from typing import Mapping from sqlalchemy.types import TypeEngine SQLATypeLike = TypeEngine | type[TypeEngine] class SQLTypeConverter: def __init__(self, conversion_map: Mapping[str, SQLATypeLike] | None = None) -> None: self.conversion_map = conversion_map or CONVERSION_MAPWould you like me to scan the repo for other string-literal casts and propose a consistent update, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte/_util/api_util.py
(3 hunks)airbyte/sources/registry.py
(1 hunks)airbyte/types.py
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/_util/api_util.py
🔇 Additional comments (2)
airbyte/sources/registry.py (2)
123-123
: Good correction: makepypi_package_name
Optional.This aligns the local var with the registry field shape and
ConnectorMetadata.pypi_package_name
. LGTM, wdyt?
78-89
: Confirm default for Java connectors.
default_install_type
falls back to DOCKER (not JAVA) when language is Java. Is this intentional so Java connectors default to Dockerized runs rather than JVM installs, wdyt?
- Use cast() approach for pypi_package_name in registry.py (following main) - Use type: ignore approach for sql_type return in types.py (following main) Co-Authored-By: AJ Steers <[email protected]>
- Updated type annotations from Path to str to match resolve_config signature - Removed unused pathlib.Path import to fix Ruff lint error Co-Authored-By: AJ Steers <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (4)
airbyte/mcp/_cloud_ops.py (4)
60-63
: Docstrings reference AIRBYTE_ but code uses AIRBYTE_CLOUD_ (mention both or switch to CLOUD_*)**To reduce confusion, can we update these docstrings to reference AIRBYTE_CLOUD_CLIENT_ID/… with an explicit fallback note to legacy AIRBYTE_* if supported, wdyt?
Suggested edit pattern:
- By default, the `AIRBYTE_CLIENT_ID`, `AIRBYTE_CLIENT_SECRET`, `AIRBYTE_WORKSPACE_ID`, - and `AIRBYTE_API_ROOT` environment variables will be used to authenticate with the + Credentials are resolved via airbyte.cloud.auth: + AIRBYTE_CLOUD_CLIENT_ID / AIRBYTE_CLOUD_CLIENT_SECRET / AIRBYTE_CLOUD_WORKSPACE_ID / + AIRBYTE_CLOUD_API_URL (falling back to legacy AIRBYTE_CLIENT_ID / AIRBYTE_CLIENT_SECRET / + AIRBYTE_WORKSPACE_ID / AIRBYTE_API_ROOT if present). These will be used to authenticate with the Airbyte Cloud API.Also applies to: 115-118, 169-172, 211-214, 241-244, 269-271, 306-308, 322-324, 334-336, 346-348
318-328
: Return JSON-serializable data from MCP tools (sources)FastMCP generally expects JSON-serializable results; returning CloudSource objects may fail. Shall we return minimal dicts, wdyt?
-def list_deployed_cloud_source_connectors() -> list[CloudSource]: +def list_deployed_cloud_source_connectors() -> list[dict[str, str]]: @@ - workspace: CloudWorkspace = _get_cloud_workspace() - return workspace.list_sources() + workspace: CloudWorkspace = _get_cloud_workspace() + sources = workspace.list_sources() + return [ + { + "type": s.connector_type, + "connector_id": s.connector_id, + "url": s.connector_url, + "workspace_id": workspace.workspace_id, + } + for s in sources + ]
330-340
: Return JSON-serializable data from MCP tools (destinations)Mirror the sources change to avoid serialization issues.
-def list_deployed_cloud_destination_connectors() -> list[CloudDestination]: +def list_deployed_cloud_destination_connectors() -> list[dict[str, str]]: @@ - workspace: CloudWorkspace = _get_cloud_workspace() - return workspace.list_destinations() + workspace: CloudWorkspace = _get_cloud_workspace() + dests = workspace.list_destinations() + return [ + { + "type": d.connector_type, + "connector_id": d.connector_id, + "url": d.connector_url, + "workspace_id": workspace.workspace_id, + } + for d in dests + ]
342-352
: Return JSON-serializable data from MCP tools (connections)Same here—return dicts to keep outputs portable.
-def list_deployed_cloud_connections() -> list[CloudConnection]: +def list_deployed_cloud_connections() -> list[dict[str, str]]: @@ - workspace: CloudWorkspace = _get_cloud_workspace() - return workspace.list_connections() + workspace: CloudWorkspace = _get_cloud_workspace() + conns = workspace.list_connections() + return [ + { + "connection_id": c.connection_id, + "url": c.connection_url, + "workspace_id": workspace.workspace_id, + } + for c in conns + ]
🧹 Nitpick comments (5)
airbyte/constants.py (1)
204-210
: Optional: add CLOUD_WEB_URL_ROOT constant for consistencyMany messages surface workspace/connector URLs. Would adding a CLOUD_WEB_URL_ROOT = "https://cloud.airbyte.com" help centralize URL construction and keep modules consistent, wdyt?
airbyte/mcp/_cloud_ops.py (4)
24-31
: Avoid interactive secret prompts in MCP contextsIf the underlying resolvers can prompt, MCP tools may hang. Shall we make the cloud auth resolvers non-interactive (e.g., allow_prompt=False with a clear error if missing) or document that interactive prompts are disabled for MCP, wdyt?
128-134
: Return message parity: include destination URL like source deployFor consistency and better UX, can we include the destination URL in the success message, wdyt?
else: return ( - f"Successfully deployed destination '{destination_name}' " - f"with ID: {deployed_destination.connector_id}" + f"Successfully deployed destination '{destination_name}' " + f"with ID '{deployed_destination.connector_id}' and " + f"URL: {deployed_destination.connector_url}" )
205-208
: Add basic validation on wait_timeoutSmall guard helps prevent negative values. Add ge=0?
- wait_timeout: Annotated[ - int, - Field(description="Maximum time to wait for sync completion (seconds)."), - ] = 300, + wait_timeout: Annotated[ + int, + Field(ge=0, description="Maximum time to wait for sync completion (seconds)."), + ] = 300,
17-21
: Clean up imports if list_ signatures change*If we switch list_* to return dicts, we can drop the CloudSource/CloudDestination/CloudConnection imports to avoid unused-import lint hits. Shall we remove them, wdyt?
-from airbyte.cloud.connections import CloudConnection -from airbyte.cloud.connectors import CloudDestination, CloudSource from airbyte.cloud.workspaces import CloudWorkspace
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
airbyte/constants.py
(1 hunks)airbyte/mcp/_cloud_ops.py
(2 hunks)airbyte/mcp/_util.py
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/mcp/_util.py
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/mcp/_cloud_ops.py (6)
airbyte/cloud/workspaces.py (12)
get_destination
(126-138)get_source
(112-124)CloudWorkspace
(57-454)deploy_source
(142-186)deploy_destination
(188-241)deploy_connection
(291-339)get_connection
(98-110)connect
(81-94)workspace_url
(75-77)list_sources
(402-427)list_destinations
(429-454)list_connections
(373-400)airbyte/cloud/auth.py (4)
resolve_cloud_api_url
(25-33)resolve_cloud_client_id
(17-22)resolve_cloud_client_secret
(9-14)resolve_cloud_workspace_id
(36-41)airbyte/cloud/connections.py (9)
CloudConnection
(19-252)source
(81-90)destination
(104-113)source_id
(70-78)destination_id
(93-101)table_prefix
(124-129)connection_url
(132-134)run_sync
(143-170)get_sync_result
(205-230)airbyte/destinations/util.py (1)
get_noop_destination
(91-109)airbyte/mcp/_util.py (1)
resolve_config
(51-148)airbyte/cloud/sync_results.py (3)
get_job_status
(186-188)job_url
(146-155)SyncResult
(129-360)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (2)
airbyte/mcp/_cloud_ops.py (2)
45-53
: Nice: config typing now matches resolve_config inputSwitching to dict | str | None removes the mypy Path mismatch and keeps the surface simple. LGTM.
Also applies to: 100-108
158-166
: Confirm selected_streams semanticsDoes deploy_connection accept bare stream names, or does it expect full stream descriptors? If empty list means “all streams,” can we document that here, wdyt?
Devin, see if you can resolve the CI failures. Don't break anything in the process. Ask if you need help. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte/_util/api_util.py (1)
63-74
: get_web_url_root returns API URL for OSS; normalize path to the actual web base, wdyt?As-is, non-cloud callers will get something like
https://host/api/v1
, which is not a valid web UI root. Could we strip a trailing/v1
and then an/api
segment (preserving any prefix), and fall back to scheme+netloc? This matches prior feedback.Apply this diff to the function:
@@ -def get_web_url_root(api_root: str) -> str: - """Get the web URL root from the main API root. - - # TODO: This does not return a valid URL for self-managed instances, due to not knowing the - # web URL root. Logged here: - # - https://github.com/airbytehq/PyAirbyte/issues/563 - """ - if api_root == CLOUD_API_ROOT: - return "https://cloud.airbyte.com" - - return api_root +def get_web_url_root(api_root: str) -> str: + """Best-effort derivation of the web UI base from an API root.""" + if api_root == CLOUD_API_ROOT: + return "https://cloud.airbyte.com" + s = urlsplit(api_root) + parts = [p for p in s.path.rstrip("/").split("/") if p] + # Drop trailing version and api segments if present (…/api[/v1]) + if parts and parts[-1].lower() in {"v1", "v0"}: + parts = parts[:-1] + if parts and parts[-1].lower() == "api": + parts = parts[:-1] + new_path = ("/" + "/".join(parts)) if parts else "" + return urlunsplit((s.scheme, s.netloc, new_path or "/", "", ""))And add needed imports:
@@ -from typing import TYPE_CHECKING, Any, Literal +from typing import TYPE_CHECKING, Any, Literal +from urllib.parse import urlsplit, urlunsplit
🧹 Nitpick comments (1)
airbyte/_util/api_util.py (1)
565-573
: dev-null override: compare case-insensitively and lift the magic UUID into constants, wdyt?
- Use a case-insensitive compare given the proposed canonical return.
- Moving the UUID into
airbyte.constants
avoids scattering a magic value.Apply this local diff:
- definition_id_override: str | None = None - if _get_destination_type_str(config) == "dev-null": - # TODO: We have to hard-code the definition ID for dev-null destination. (important-comment) - # https://github.com/airbytehq/PyAirbyte/issues/743 - definition_id_override = "a7bcc9d8-13b3-4e49-b80d-d020b90045e3" + definition_id_override: str | None = None + if _get_destination_type_str(config) == "dev-null": + # NOTE: Temporarily hard-code the dev-null definition ID until lookup is available. + definition_id_override = DEV_NULL_DESTINATION_DEFINITION_IDAnd extend the import:
@@ -from airbyte.constants import CLOUD_API_ROOT, CLOUD_CONFIG_API_ROOT +from airbyte.constants import CLOUD_API_ROOT, CLOUD_CONFIG_API_ROOT, DEV_NULL_DESTINATION_DEFINITION_IDOptionally, add this constant to
airbyte/constants.py
:# Dev-null destination definition ID (temporary until lookup API is wired) DEV_NULL_DESTINATION_DEFINITION_ID = "a7bcc9d8-13b3-4e49-b80d-d020b90045e3"
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/_util/api_util.py
(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/_util/api_util.py (1)
airbyte/exceptions.py (1)
PyAirbyteInputError
(201-210)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (3)
airbyte/_util/api_util.py (3)
23-23
: Importing shared constants is good consolidationPulling
CLOUD_API_ROOT
andCLOUD_CONFIG_API_ROOT
from a single source of truth reduces drift, nice.
783-784
: Defaulting get_bearer_token to CLOUD_API_ROOT looks goodThis aligns defaults with Cloud usage and keeps call sites terse.
528-545
: Broaden _get_destination_type_str to accept snake_case and return canonical lowercase — wdyt?Repo contains both "destinationType" and "destination_type" (e.g., tests/integration_tests/test_state_handling.py:162; airbyte/destinations/_translate_dest_to_cache.py:50). Apply this diff:
def _get_destination_type_str( destination: DestinationConfiguration | dict[str, Any], ) -> str: - if isinstance(destination, dict): - destination_type = destination.get("destinationType") - else: - destination_type = getattr(destination, "destinationType", None) + if isinstance(destination, dict): + destination_type = destination.get("destinationType") or destination.get("destination_type") + else: + destination_type = ( + getattr(destination, "destinationType", None) + or getattr(destination, "destination_type", None) + ) if not destination_type or not isinstance(destination_type, str): raise PyAirbyteInputError( message="Could not determine destination type from configuration.", context={ "destination": destination, }, ) - return destination_type + return destination_type.strip().casefold()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
airbyte/_util/api_util.py (2)
790-800
: Add request timeouts to avoid hanging on auth calls.External calls without timeouts can hang indefinitely; can we set sane connect/read timeouts, wdyt?
- response = requests.post( + response = requests.post( url=api_root + "/applications/token", @@ - json={ - "client_id": client_id, - "client_secret": client_secret, - }, + json={"client_id": client_id, "client_secret": client_secret}, + timeout=(5, 30), )
159-166
: Error handling condition inverted in list helpers (can skip raising, then assert).*These branches raise only when not OK and response object is truthy; when not OK and response is None, we skip raising and hit an assert. Shall we switch to “or … is None” for correctness across the four helpers, wdyt?
- if not status_ok(response.status_code) and response.connections_response: + if not status_ok(response.status_code) or response.connections_response is None: raise AirbyteError( @@ - if not status_ok(response.status_code) and response.workspaces_response: + if not status_ok(response.status_code) or response.workspaces_response is None: raise AirbyteError( @@ - if not status_ok(response.status_code) and response.sources_response: + if not status_ok(response.status_code) or response.sources_response is None: raise AirbyteError( @@ - if not status_ok(response.status_code) and response.destinations_response: + if not status_ok(response.status_code) or response.destinations_response is None: raise AirbyteError(Also applies to: 202-209, 242-249, 280-287
♻️ Duplicate comments (1)
airbyte/_util/api_util.py (1)
63-74
: get_web_url_root should normalize OSS URLs (strip /api[/v1])Returning api_root as-is yields invalid web links for self-managed (e.g., https://host/api/v1). Shall we normalize by dropping trailing version and optional api segments while preserving any prefix, wdyt? (Echoing prior suggestion.)
Apply:
+from urllib.parse import urlsplit, urlunsplit @@ -def get_web_url_root(api_root: str) -> str: - """Get the web URL root from the main API root. - - # TODO: This does not return a valid URL for self-managed instances, due to not knowing the - # web URL root. Logged here: - # - https://github.com/airbytehq/PyAirbyte/issues/563 - """ - if api_root == CLOUD_API_ROOT: - return "https://cloud.airbyte.com" - - return api_root +def get_web_url_root(api_root: str) -> str: + """Best-effort derivation of the web UI base from an API root.""" + if api_root == CLOUD_API_ROOT: + return "https://cloud.airbyte.com" + s = urlsplit(api_root) + parts = [p for p in s.path.rstrip("/").split("/") if p] + if parts and parts[-1].lower() in {"v1", "v0"}: + parts = parts[:-1] + if parts and parts[-1].lower() == "api": + parts = parts[:-1] + new_path = ("/" + "/".join(parts)) if parts else "" + return urlunsplit((s.scheme, s.netloc, new_path or "/", "", ""))Could you add one unit test that covers: (1) https://host/api/v1 -> https://host/ and (2) https://host/prefix/api/v1 -> https://host/prefix/, wdyt?
🧹 Nitpick comments (4)
airbyte/_util/api_util.py (4)
34-45
: Deduplicate TYPE_CHECKING blocks.Can we collapse these two TYPE_CHECKING sections into one to reduce noise, wdyt?
-if TYPE_CHECKING: - from collections.abc import Callable - -if TYPE_CHECKING: - from collections.abc import Callable - from airbyte_api.models import ( - DestinationConfiguration, - ) +if TYPE_CHECKING: + from collections.abc import Callable + from airbyte_api.models import DestinationConfiguration
528-545
: Make destination type extraction more robust across dict/dataclass shapes.To avoid AttributeError and casing mismatches across generators, shall we probe both camelCase and snake_case and only then fall back to a class constant, returning a string, wdyt?
def _get_destination_type_str( destination: DestinationConfiguration | dict[str, Any], ) -> str: - if isinstance(destination, dict): - destination_type = destination.get("destinationType") - else: - destination_type = getattr(destination, "DESTINATION_TYPE", None) + if isinstance(destination, dict): + destination_type = destination.get("destinationType") or destination.get("destination_type") + else: + destination_type = ( + getattr(destination, "destinationType", None) + or getattr(destination, "destination_type", None) + or getattr(destination, "DESTINATION_TYPE", None) + ) @@ - return destination_type + return str(destination_type)Optionally, would you like to normalize via
.casefold()
here and compare against normalized literals at call sites, wdyt?
565-573
: Dev‑null override: case-insensitive match and move magic UUID to constants.Two tweaks: (1) compare destination type case-insensitively, and (2) avoid a hard‑coded UUID by moving it to airbyte.constants (e.g., DEV_NULL_DESTINATION_DEFINITION_ID). Shall we do both, wdyt?
Minimal change:
- if _get_destination_type_str(config) == "dev-null": + if _get_destination_type_str(config).casefold() == "dev-null": @@ - definition_id=definition_id_override, + definition_id=definition_id_override,Or, if you prefer the constant:
+from airbyte.constants import DEV_NULL_DESTINATION_DEFINITION_ID @@ - if _get_destination_type_str(config) == "dev-null": + if _get_destination_type_str(config).casefold() == "dev-null": @@ - definition_id_override = "a7bcc9d8-13b3-4e49-b80d-d020b90045e3" + definition_id_override = DEV_NULL_DESTINATION_DEFINITION_ID
336-341
: Docstring drift: mentions block/raise_on_failure not present in signature.Can we update the docstring to reflect current behavior (trigger sync, return job), wdyt?
def run_connection( @@ - """Get a connection. - - If block is True, this will block until the connection is finished running. - - If raise_on_failure is True, this will raise an exception if the connection fails. - """ + """Trigger a SYNC job for the given connection and return the created JobResponse."""
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/_util/api_util.py
(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/_util/api_util.py (1)
airbyte/exceptions.py (1)
PyAirbyteInputError
(201-210)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (2)
airbyte/_util/api_util.py (2)
23-23
: Good import consolidation (CLOUD constants).Switching to a single source of truth in airbyte.constants looks good to me.
821-833
: Add timeout and unwrap Bearer token before sending the header?
- Add an explicit timeout to the requests.request call (e.g., timeout=(5, 60)).
- Ensure bearer_token is the raw secret value before formatting the Authorization header — if it's a pydantic.SecretStr use bearer_token.get_secret_value(), otherwise use the appropriate accessor (e.g., .value). I can patch the header to unwrap the secret and add the timeout; wdyt?
feat(mcp): add four new cloud operation MCP tools
Summary
This PR adds new MCP (Model Context Protocol) tools to PyAirbyte for Airbyte Cloud operations, including but not limited to:
deploy_source_to_cloud
- Deploy a source connector to Airbyte Cloud workspacedeploy_destination_to_cloud
- Deploy a destination connector to Airbyte Cloud workspacecreate_connection_on_cloud
- Create a connection between deployed source and destinationrun_cloud_sync
- Execute a sync job on a cloud connection with optional wait functionalityThese tools leverage existing cloud functionality in PyAirbyte's
cloud.CloudWorkspace
class and follow established MCP tool patterns from the existingget_cloud_sync_status
tool. All tools useAIRBYTE_CLIENT_ID
andAIRBYTE_CLIENT_SECRET
environment variables for authentication via the secrets manager.Review & Testing Checklist for Human
workspace.deploy_source()
,workspace.deploy_destination()
, andworkspace.deploy_connection()
match their expected signaturesfastmcp inspect airbyte/mcp/server.py:app
and confirm all 4 new tools appear in the outputRecommended Test Plan: Set up test Airbyte Cloud workspace with credentials, then use MCP client to call each tool in sequence: deploy source-faker → deploy destination-duckdb → create connection → run sync.
Diagram
Notes
get_cloud_sync_status
*,
) for boolean parameters to satisfy linting requirementsregister_cloud_ops_tools()
functionLink to Devin run: https://app.devin.ai/sessions/ddc53a3d3c1d4431ae43ea5b351ccead
Requested by: @aaronsteers
Summary by CodeRabbit
New Features
Improvements
Documentation
Tests
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.