Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 80 additions & 76 deletions airbyte/mcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,72 @@

r"""***PyAirbyte MCP Server - Model Context Protocol Integration***

> **NOTE:**
> This MCP server implementation is experimental and may change without notice between minor
> versions of PyAirbyte. The API may be modified or entirely refactored in future versions.

The PyAirbyte MCP (Model Context Protocol) server provides a standardized interface for
managing Airbyte connectors through MCP-compatible clients. This experimental feature
allows you to list connectors, validate configurations, and run sync operations using
the MCP protocol.

The Model Context Protocol (MCP) is an open standard that enables AI assistants and
other tools to securely connect to data sources, tools, and services. PyAirbyte's MCP
server implementation allows you to build and interact with Airbyte connectors through
this standardized protocol.
## Getting Started with PyAirbyte MCP

Create a JSON configuration file to register the PyAirbyte MCP server with your MCP
client. Create a file named `server_config.json`:
To get started with the PyAirbyte MCP server, follow these steps:

> **NOTE:**
> This MCP server implementation is experimental and may change without notice between minor
> versions of PyAirbyte. The API may be modified or entirely refactored in future versions.
1. Create a Dotenv secrets file.
2. Register the MCP server with your MCP client.
3. Test the MCP server connection using your MCP client.

### Step 1: Generate a Dotenv Secrets File

To get started with the PyAirbyte MCP server, you will need to create a dotenv
file containing your Airbyte Cloud credentials, as well as credentials for any
third-party services you wish to connect to via Airbyte.

Create a file named `~/.mcp/airbyte_mcp.env` with the following content:

```ini
# Airbyte Project Artifacts Directory
AIRBYTE_PROJECT_DIR=/path/to/any/writeable/project-dir

# MCP Server Configuration
# Airbyte Cloud Credentials (Required for Airbyte Cloud Operations)
AIRBYTE_CLOUD_CLIENT_ID=your_api_key
AIRBYTE_CLOUD_CLIENT_SECRET=your_api_secret
AIRBYTE_CLOUD_WORKSPACE_ID=your_workspace_id

Assuming `uv` is installed, you can use the following configuration:
# API-Specific Credentials (Optional, depending on your connectors)

# For example, for a PostgreSQL source connector:
# POSTGRES_HOST=your_postgres_host
# POSTGRES_PORT=5432
# POSTGRES_DB=your_database_name
# POSTGRES_USER=your_database_user
# POSTGRES_PASSWORD=your_database_password

# For example, for a Stripe source connector:
# STRIPE_API_KEY=your_stripe_api_key
# STRIPE_API_SECRET=your_stripe_api_secret
# STRIPE_WEBHOOK_SECRET=your_stripe_webhook_secret
```

Note:
1. You can add more environment variables to this file as needed for different connectors. To start,
you only need to create the file and pass it to the MCP server.
2. Ensure that this file is kept secure, as it contains sensitive information. Your LLM
*should never* be given direct access to this file or its contents.
3. The MCP tools will give your LLM the ability to view *which* variables are available, but it
does not give access to their values.
4. The `AIRBYTE_PROJECT_DIR` variable specifies a directory where the MCP server can
store temporary project files. Ensure this directory is writable by the user running
the MCP server.

### Step 2: Registering the MCP Server

First install `uv` (`brew install uv`).

Then, create a file named `server_config.json` (or the file name required by your MCP client)
with the following content:

```json
{
Expand All @@ -30,7 +76,7 @@
"command": "uvx",
"args": ["--from=airbyte", "airbyte-mcp"],
"env": {
"AIRBYTE_MCP_ENV_FILE": "~/.mcp/airbyte_mcp.env"
"AIRBYTE_MCP_ENV_FILE": "/path/to/my/.mcp/airbyte_mcp.env"
}
}
}
Expand All @@ -47,95 +93,53 @@
"command": "airbyte-mcp",
"args": [],
"env": {
"AIRBYTE_MCP_ENV_FILE": "~/.mcp/airbyte_mcp.env"
"AIRBYTE_MCP_ENV_FILE": "/path/to/my/.mcp/airbyte_mcp.env"
}
}
}
}
```

## Testing the MCP Server


The easiest way to test PyAirbyte MCP tools during development is using the built-in Poe tasks.
These tasks automatically inherit environment variables from your shell session:

```bash
poe mcp-tool-test <tool_name> '<json_args>'

poe mcp-tool-test list_connectors '{}'
poe mcp-tool-test get_config_spec '{"connector_name": "source-pokeapi"}'
poe mcp-tool-test validate_config \
'{"connector_name": "source-pokeapi", "config": {"pokemon_name": "pikachu"}}'
poe mcp-tool-test run_sync \
'{"connector_name": "source-pokeapi", "config": {"pokemon_name": "pikachu"}}'

poe mcp-tool-test check_airbyte_cloud_workspace '{}'
poe mcp-tool-test list_deployed_cloud_connections '{}'
```


```bash
poe mcp-serve-local # STDIO transport (default)
poe mcp-serve-http # HTTP transport on localhost:8000
poe mcp-serve-sse # Server-Sent Events transport on localhost:8000

poe mcp-inspect # Show all available MCP tools and their schemas
```
Note:
- Replace `/path/to/.mcp/airbyte_mcp.env` with the absolute path to your dotenv file created in
Step 1.

### Step 3: Testing the MCP Server Connection

You can test the MCP server connection using your MCP client.

## Contributing to PyAirbyte and the Airbyte MCP Server
Helpful prompts to try:

The Airbyte MCP server is part of the PyAirbyte project. Contributions are welcome!
1. "Use your MCP tools to list all available Airbyte connectors."
2. "Use your MCP tools to get information about the Airbyte Stripe connector."
3. "Use your MCP tools to list all variables you have access to in the dotenv secrets
file."
4. "Use your MCP tools to check your connection to your Airbyte Cloud workspace."
5. "Use your MCP tools to list all available destinations in my Airbyte Cloud workspace."

You can contribute to the MCP server by adding new tools, improving existing functionality, or
fixing bugs. The server is built using the FastMCP framework, which provides a flexible
interface for defining tools and handling requests.
## Contributing to the Airbyte MCP Server

As a starting point, you can clone the repo and inspect the server definition using the `fastmcp`
CLI tool:

```bash
poetry install --all-extras
poetry run fastmcp inspect airbyte/mcp/server.py:app
```

In your MCP config, you can test your development updates using `poetry` as the entrypoint:

```json
{
"mcpServers": {
"airbyte": {
"command": "poetry",
"args": [
"--directory=~/repos/PyAirbyte",
"run",
"airbyte-mcp"
],
"env": {
"AIRBYTE_MCP_ENV_FILE": "~/.mcp/airbyte_mcp.env"
}
}
}
}
```
- [PyAirbyte Contributing Guide](https://github.com/airbytehq/PyAirbyte/blob/main/docs/CONTRIBUTING.md)

### Additional resources

- [Model Context Protocol Documentation](https://modelcontextprotocol.io/)
- [MCP Python SDK](https://github.com/modelcontextprotocol/python-sdk)

For issues and questions:
- [PyAirbyte Contributing Guide](https://github.com/airbytehq/PyAirbyte/blob/main/docs/CONTRIBUTING.md)
- [PyAirbyte GitHub Issues](https://github.com/airbytehq/pyairbyte/issues)
- [PyAirbyte Discussions](https://github.com/airbytehq/pyairbyte/discussions)

""" # noqa: D415

from airbyte.mcp import server
from airbyte.mcp import cloud_ops, connector_registry, local_ops, server


__all__: list[str] = ["server"]
__all__: list[str] = [
"cloud_ops",
"connector_registry",
"local_ops",
"server",
]

__docformat__ = "google"
5 changes: 4 additions & 1 deletion airbyte/mcp/_cloud_ops.py → airbyte/mcp/cloud_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,10 @@ def list_deployed_cloud_connections() -> list[CloudConnection]:


def register_cloud_ops_tools(app: FastMCP) -> None:
"""Register tools with the FastMCP app."""
"""@private Register tools with the FastMCP app.
This is an internal function and should not be called directly.
"""
app.tool(check_airbyte_cloud_workspace)
app.tool(deploy_source_to_cloud)
app.tool(deploy_destination_to_cloud)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def list_connectors(


class ConnectorInfo(BaseModel):
"""Class to hold connector information."""
"""@private Class to hold connector information."""

connector_name: str
connector_metadata: ConnectorMetadata | None = None
Expand All @@ -110,6 +110,7 @@ class ConnectorInfo(BaseModel):
manifest_url: str | None = None


# @app.tool() # << deferred
def get_connector_info(
connector_name: Annotated[
str,
Expand Down Expand Up @@ -151,6 +152,9 @@ def get_connector_info(


def register_connector_registry_tools(app: FastMCP) -> None:
"""Register tools with the FastMCP app."""
"""@private Register tools with the FastMCP app.
This is an internal function and should not be called directly.
"""
app.tool(list_connectors)
app.tool(get_connector_info)
12 changes: 9 additions & 3 deletions airbyte/mcp/_local_ops.py → airbyte/mcp/local_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from airbyte.caches.duckdb import DuckDBCache


CONFIG_HELP = """
_CONFIG_HELP = """
You can provide `config` as JSON or a Path to a YAML/JSON file.
If a `dict` is provided, it must not contain hardcoded secrets.
Instead, secrets should be provided using environment variables,
Expand Down Expand Up @@ -643,6 +643,7 @@ class CachedDatasetInfo(BaseModel):
schema_name: str | None = None


# @app.tool() # << deferred
def list_cached_streams() -> list[CachedDatasetInfo]:
"""List all streams available in the default DuckDB cache."""
cache: DuckDBCache = get_default_cache()
Expand All @@ -658,6 +659,7 @@ def list_cached_streams() -> list[CachedDatasetInfo]:
return result


# @app.tool() # << deferred
def describe_default_cache() -> dict[str, Any]:
"""Describe the currently configured default cache."""
cache = get_default_cache()
Expand Down Expand Up @@ -704,6 +706,7 @@ def _is_safe_sql(sql_query: str) -> bool:
return any(normalized_query.startswith(prefix) for prefix in allowed_prefixes)


# @app.tool() # << deferred
def run_sql_query(
sql_query: Annotated[
str,
Expand Down Expand Up @@ -758,7 +761,10 @@ def run_sql_query(


def register_local_ops_tools(app: FastMCP) -> None:
"""Register tools with the FastMCP app."""
"""@private Register tools with the FastMCP app.

This is an internal function and should not be called directly.
"""
app.tool(list_connector_config_secrets)
for tool in (
describe_default_cache,
Expand All @@ -775,5 +781,5 @@ def register_local_ops_tools(app: FastMCP) -> None:
# Register each tool with the FastMCP app.
app.tool(
tool,
description=(tool.__doc__ or "").rstrip() + "\n" + CONFIG_HELP,
description=(tool.__doc__ or "").rstrip() + "\n" + _CONFIG_HELP,
)
16 changes: 12 additions & 4 deletions airbyte/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,31 @@
from fastmcp import FastMCP

from airbyte._util.meta import set_mcp_mode
from airbyte.mcp._cloud_ops import register_cloud_ops_tools
from airbyte.mcp._connector_registry import register_connector_registry_tools
from airbyte.mcp._local_ops import register_local_ops_tools
from airbyte.mcp._util import initialize_secrets
from airbyte.mcp.cloud_ops import register_cloud_ops_tools
from airbyte.mcp.connector_registry import register_connector_registry_tools
from airbyte.mcp.local_ops import register_local_ops_tools


set_mcp_mode()
initialize_secrets()

app: FastMCP = FastMCP("airbyte-mcp")
"""The Airbyte MCP Server application instance."""

register_connector_registry_tools(app)
register_local_ops_tools(app)
register_cloud_ops_tools(app)


def main() -> None:
"""Main entry point for the MCP server."""
"""@private Main entry point for the MCP server.

This function starts the FastMCP server to handle MCP requests.

It should not be called directly; instead, consult the MCP client documentation
for instructions on how to connect to the server.
"""
print("Starting Airbyte MCP server.", file=sys.stderr)
try:
asyncio.run(app.run_stdio_async())
Expand Down
Loading
Loading