Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 78 additions & 70 deletions airbyte/mcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

r"""***PyAirbyte MCP Server - Model Context Protocol Integration***
> **NOTE:**
> This MCP server implementation is experimental and may change without notice between minor
> versions of PyAirbyte. The API may be modified or entirely refactored in future versions.
The PyAirbyte MCP (Model Context Protocol) server provides a standardized interface for
managing Airbyte connectors through MCP-compatible clients. This experimental feature
allows you to list connectors, validate configurations, and run sync operations using
Expand All @@ -15,13 +19,60 @@
Create a JSON configuration file to register the PyAirbyte MCP server with your MCP
client. Create a file named `server_config.json`:
> **NOTE:**
> This MCP server implementation is experimental and may change without notice between minor
> versions of PyAirbyte. The API may be modified or entirely refactored in future versions.
## Getting Started with PyAirbyte MCP
To get started with the PyAirbyte MCP server, follow these steps:
1. Create a Dotenv secrets file.
2. Register the MCP server with your MCP client.
3. Test the MCP server connection using your MCP client.
### Step 1: Generate a Dotenv Secrets File
To get started with the PyAirbyte MCP server, you will need to create a dotenv
file containing your Airbyte Cloud credentials, as well as credentials for any
third-party services you wish to connect to via Airbyte.
Create a file named `~/.mcp/airbyte_mcp.env` with the following content:
# MCP Server Configuration
```ini
# Airbyte Project Artifacts Directory
AIRBYTE_PROJECT_DIR=/path/to/any/writeable/project-dir
Assuming `uv` is installed, you can use the following configuration:
# Airbyte Cloud Credentials (Required for Airbyte Cloud Operations)
AIRBYTE_CLOUD_CLIENT_ID=your_api_key
AIRBYTE_CLOUD_CLIENT_SECRET=your_api_secret
AIRBYTE_CLOUD_WORKSPACE_ID=your_workspace_id
# API-Specific Credentials (Optional, depending on your connectors)
# For example, for a PostgreSQL source connector:
# POSTGRES_HOST=your_postgres_host
# POSTGRES_PORT=5432
# POSTGRES_DB=your_database_name
# POSTGRES_USER=your_database_user
# POSTGRES_PASSWORD=your_database_password
# For example, for a Stripe source connector:
# STRIPE_API_KEY=your_stripe_api_key
# STRIPE_API_SECRET=your_stripe_api_secret
# STRIPE_WEBHOOK_SECRET=your_stripe_webhook_secret
```
Note:
1. You can add more environment variables to this file as needed for different connectors. To start,
you only need to create the file and pass it to the MCP server.
2. Ensure that this file is kept secure, as it contains sensitive information. Your LLM _should
never_ be given direct access to this file or its contents.
3. The MCP tools will give your LLM the ability to view _which_ variables are available, but it
does not give access to their values.
4. The `AIRBYTE_PROJECT_DIR` variable specifies a directory where the MCP server can
store temporary project files. Ensure this directory is writable by the user running
the MCP server.
### Step 2: Registering the MCP Server
First install `uv` (`brew install uv`), and then you can use the following configuration:
```json
{
Expand All @@ -30,7 +81,7 @@
"command": "uvx",
"args": ["--from=airbyte", "airbyte-mcp"],
"env": {
"AIRBYTE_MCP_ENV_FILE": "~/.mcp/airbyte_mcp.env"
"AIRBYTE_MCP_ENV_FILE": "/path/to/.mcp/airbyte_mcp.env"
}
}
}
Expand All @@ -47,95 +98,52 @@
"command": "airbyte-mcp",
"args": [],
"env": {
"AIRBYTE_MCP_ENV_FILE": "~/.mcp/airbyte_mcp.env"
"AIRBYTE_MCP_ENV_FILE": "/path/to/.mcp/airbyte_mcp.env"
}
}
}
}
```
## Testing the MCP Server
Note:
- Replace `/path/to/.mcp/airbyte_mcp.env` with the absolute path to your dotenv file created in
Step 1.
The easiest way to test PyAirbyte MCP tools during development is using the built-in Poe tasks.
These tasks automatically inherit environment variables from your shell session:
### Step 3: Testing the MCP Server Connection
```bash
poe mcp-tool-test <tool_name> '<json_args>'
You can test the MCP server connection using your MCP client.
poe mcp-tool-test list_connectors '{}'
poe mcp-tool-test get_config_spec '{"connector_name": "source-pokeapi"}'
poe mcp-tool-test validate_config \
'{"connector_name": "source-pokeapi", "config": {"pokemon_name": "pikachu"}}'
poe mcp-tool-test run_sync \
'{"connector_name": "source-pokeapi", "config": {"pokemon_name": "pikachu"}}'
Helpful prompts to try:
poe mcp-tool-test check_airbyte_cloud_workspace '{}'
poe mcp-tool-test list_deployed_cloud_connections '{}'
```
1. "Use your MCP tools to list all available Airbyte connectors."
2. "Use your MCP tools to get information about the Airbyte "stripe" connector."
3. "Use your MCP tools to list all variables you have access to in the dotenv secrets
file."
4. "User your MCP tools to check your connection to your Airbyte Cloud workspace."
## Contributing to the Airbyte MCP Server
```bash
poe mcp-serve-local # STDIO transport (default)
poe mcp-serve-http # HTTP transport on localhost:8000
poe mcp-serve-sse # Server-Sent Events transport on localhost:8000
poe mcp-inspect # Show all available MCP tools and their schemas
```
## Contributing to PyAirbyte and the Airbyte MCP Server
The Airbyte MCP server is part of the PyAirbyte project. Contributions are welcome!
You can contribute to the MCP server by adding new tools, improving existing functionality, or
fixing bugs. The server is built using the FastMCP framework, which provides a flexible
interface for defining tools and handling requests.
As a starting point, you can clone the repo and inspect the server definition using the `fastmcp`
CLI tool:
```bash
poetry install --all-extras
poetry run fastmcp inspect airbyte/mcp/server.py:app
```
In your MCP config, you can test your development updates using `poetry` as the entrypoint:
```json
{
"mcpServers": {
"airbyte": {
"command": "poetry",
"args": [
"--directory=~/repos/PyAirbyte",
"run",
"airbyte-mcp"
],
"env": {
"AIRBYTE_MCP_ENV_FILE": "~/.mcp/airbyte_mcp.env"
}
}
}
}
```
- [PyAirbyte Contributing Guide](https://github.com/airbytehq/PyAirbyte/blob/main/docs/CONTRIBUTING.md)
### Additional resources
- [Model Context Protocol Documentation](https://modelcontextprotocol.io/)
- [MCP Python SDK](https://github.com/modelcontextprotocol/python-sdk)
For issues and questions:
- [PyAirbyte Contributing Guide](https://github.com/airbytehq/PyAirbyte/blob/main/docs/CONTRIBUTING.md)
- [PyAirbyte GitHub Issues](https://github.com/airbytehq/pyairbyte/issues)
- [PyAirbyte Discussions](https://github.com/airbytehq/pyairbyte/discussions)
""" # noqa: D415

from airbyte.mcp import server
from airbyte.mcp import cloud_ops, connector_registry, local_ops, server


__all__: list[str] = ["server"]
__all__: list[str] = [
"cloud_ops",
"connector_registry",
"local_ops",
"server",
]

__docformat__ = "google"
5 changes: 4 additions & 1 deletion airbyte/mcp/_cloud_ops.py → airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,10 @@ def list_deployed_cloud_connections() -> list[CloudConnection]:


def register_cloud_ops_tools(app: FastMCP) -> None:
"""Register tools with the FastMCP app."""
"""@private Register tools with the FastMCP app.
This is an internal function and should not be called directly.
"""
app.tool(check_airbyte_cloud_workspace)
app.tool(deploy_source_to_cloud)
app.tool(deploy_destination_to_cloud)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def list_connectors(


class ConnectorInfo(BaseModel):
"""Class to hold connector information."""
"""@private Class to hold connector information."""

connector_name: str
connector_metadata: ConnectorMetadata | None = None
Expand All @@ -110,6 +110,7 @@ class ConnectorInfo(BaseModel):
manifest_url: str | None = None


# @app.tool() # << deferred
def get_connector_info(
connector_name: Annotated[
str,
Expand Down Expand Up @@ -151,6 +152,9 @@ def get_connector_info(


def register_connector_registry_tools(app: FastMCP) -> None:
"""Register tools with the FastMCP app."""
"""@private Register tools with the FastMCP app.
This is an internal function and should not be called directly.
"""
app.tool(list_connectors)
app.tool(get_connector_info)
12 changes: 9 additions & 3 deletions airbyte/mcp/_local_ops.py → airbyte/mcp/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airbyte.caches.duckdb import DuckDBCache


CONFIG_HELP = """
_CONFIG_HELP = """
You can provide `config` as JSON or a Path to a YAML/JSON file.
If a `dict` is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
Expand Down Expand Up @@ -643,6 +643,7 @@ class CachedDatasetInfo(BaseModel):
schema_name: str | None = None


# @app.tool() # << deferred
def list_cached_streams() -> list[CachedDatasetInfo]:
"""List all streams available in the default DuckDB cache."""
cache: DuckDBCache = get_default_cache()
Expand All @@ -658,6 +659,7 @@ def list_cached_streams() -> list[CachedDatasetInfo]:
return result


# @app.tool() # << deferred
def describe_default_cache() -> dict[str, Any]:
"""Describe the currently configured default cache."""
cache = get_default_cache()
Expand Down Expand Up @@ -704,6 +706,7 @@ def _is_safe_sql(sql_query: str) -> bool:
return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes)


# @app.tool() # << deferred
def run_sql_query(
sql_query: Annotated[
str,
Expand Down Expand Up @@ -758,7 +761,10 @@ def run_sql_query(


def register_local_ops_tools(app: FastMCP) -> None:
"""Register tools with the FastMCP app."""
"""@private Register tools with the FastMCP app.

This is an internal function and should not be called directly.
"""
app.tool(list_connector_config_secrets)
for tool in (
describe_default_cache,
Expand All @@ -775,5 +781,5 @@ def register_local_ops_tools(app: FastMCP) -> None:
# Register each tool with the FastMCP app.
app.tool(
tool,
description=(tool.__doc__ or "").rstrip() + "\n" + CONFIG_HELP,
description=(tool.__doc__ or "").rstrip() + "\n" + _CONFIG_HELP,
)
16 changes: 12 additions & 4 deletions airbyte/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,31 @@
from fastmcp import FastMCP

from airbyte._util.meta import set_mcp_mode
from airbyte.mcp._cloud_ops import register_cloud_ops_tools
from airbyte.mcp._connector_registry import register_connector_registry_tools
from airbyte.mcp._local_ops import register_local_ops_tools
from airbyte.mcp._util import initialize_secrets
from airbyte.mcp.cloud_ops import register_cloud_ops_tools
from airbyte.mcp.connector_registry import register_connector_registry_tools
from airbyte.mcp.local_ops import register_local_ops_tools


set_mcp_mode()
initialize_secrets()

app: FastMCP = FastMCP("airbyte-mcp")
"""The Airbyte MCP Server application instance."""

register_connector_registry_tools(app)
register_local_ops_tools(app)
register_cloud_ops_tools(app)


def main() -> None:
"""Main entry point for the MCP server."""
"""@private Main entry point for the MCP server.

This function starts the FastMCP server to handle MCP requests.

It should not be called directly; instead, consult the MCP client documentation
for instructions on how to connect to the server.
"""
print("Starting Airbyte MCP server.", file=sys.stderr)
try:
asyncio.run(app.run_stdio_async())
Expand Down
Loading
Loading