Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions airbyte/_util/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import requests


_MCP_MODE_ENABLED: bool = False
"""Whether we are running in MCP (Model Context Protocol) mode."""

COLAB_SESSION_URL = "http://172.28.0.12:9000/api/sessions"
"""URL to get the current Google Colab session information."""

Expand All @@ -32,6 +35,25 @@ def is_ci() -> bool:
return "CI" in os.environ


def set_mcp_mode() -> None:
"""Set flag indicating we are running in MCP (Model Context Protocol) mode.

This should be called early in MCP server initialization to ensure
proper detection and prevent interactive prompts.
"""
global _MCP_MODE_ENABLED
_MCP_MODE_ENABLED = True


def is_mcp_mode() -> bool:
"""Return True if running in MCP (Model Context Protocol) mode."""
if _MCP_MODE_ENABLED:
return True

script_name = get_python_script_name()
return bool(script_name and "airbyte-mcp" in script_name)


@lru_cache
def is_langchain() -> bool:
"""Return True if running in a Langchain environment.
Expand All @@ -55,15 +77,16 @@ def is_colab() -> bool:
return bool(get_colab_release_version())


@lru_cache
def is_interactive() -> bool:
"""Return True if running in an interactive environment where we can prompt users for input."""
try:
if is_colab() or is_jupyter():
return True

if is_ci():
if is_ci() or is_mcp_mode():
return False

# No special modes detected. Return result based on whether stdin and stdout are ttys.
return bool(
sys.__stdin__ and sys.__stdin__.isatty() and sys.__stdout__ and sys.__stdout__.isatty()
)
Expand Down
1 change: 1 addition & 0 deletions airbyte/_util/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ def get_env_flags() -> dict[str, Any]:
flags: dict[str, bool | str] = {
"CI": meta.is_ci(),
"LANGCHAIN": meta.is_langchain(),
"MCP": meta.is_mcp_mode(),
"NOTEBOOK_RUNTIME": (
"GOOGLE_COLAB"
if meta.is_colab()
Expand Down
26 changes: 24 additions & 2 deletions airbyte/mcp/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@
import dotenv
import yaml

from airbyte.secrets import DotenvSecretManager, GoogleGSMSecretManager, register_secret_manager
from airbyte._util.meta import is_interactive
from airbyte.secrets import (
DotenvSecretManager,
GoogleGSMSecretManager,
SecretSourceEnum,
register_secret_manager,
)
from airbyte.secrets.config import disable_secret_source
from airbyte.secrets.hydration import deep_update, detect_hardcoded_secrets
from airbyte.secrets.util import get_secret, is_secret_available

Expand All @@ -28,8 +35,19 @@ def _load_dotenv_file(dotenv_path: Path | str) -> None:


def initialize_secrets() -> None:
"""Initialize dotenv to load environment variables from .env files."""
"""Initialize dotenv to load environment variables from .env files.
Note: Later secret manager registrations have higher priority than earlier ones.
"""
# Load the .env file from the current working directory.
envrc_path = Path.cwd() / ".envrc"
if envrc_path.exists():
envrc_secret_mgr = DotenvSecretManager(envrc_path)
_load_dotenv_file(envrc_path)
register_secret_manager(
envrc_secret_mgr,
)

if AIRBYTE_MCP_DOTENV_PATH_ENVVAR in os.environ:
dotenv_path = Path(os.environ[AIRBYTE_MCP_DOTENV_PATH_ENVVAR]).absolute()
custom_dotenv_secret_mgr = DotenvSecretManager(dotenv_path)
Expand All @@ -47,6 +65,10 @@ def initialize_secrets() -> None:
)
)

# Make sure we disable the prompt source in non-interactive environments.
if not is_interactive():
disable_secret_source(SecretSourceEnum.PROMPT)


def resolve_config( # noqa: PLR0912
config: dict | str | None = None,
Expand Down
5 changes: 3 additions & 2 deletions airbyte/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@

from fastmcp import FastMCP

from airbyte._util.meta import set_mcp_mode
from airbyte.mcp._cloud_ops import register_cloud_ops_tools
from airbyte.mcp._connector_registry import register_connector_registry_tools
from airbyte.mcp._local_ops import register_local_ops_tools
from airbyte.mcp._util import initialize_secrets


initialize_secrets()

app: FastMCP = FastMCP("airbyte-mcp")
register_connector_registry_tools(app)
register_local_ops_tools(app)
Expand All @@ -23,6 +22,8 @@
def main() -> None:
"""Main entry point for the MCP server."""
print("Starting Airbyte MCP server.", file=sys.stderr)
set_mcp_mode()
initialize_secrets()
try:
asyncio.run(app.run_stdio_async())
except KeyboardInterrupt:
Expand Down
65 changes: 46 additions & 19 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,24 +114,40 @@
return s.connect_ex(("localhost", port)) == 0


def find_free_port():
"""Find a free port to use for PostgreSQL testing."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
s.listen(1)
port = s.getsockname()[1]
return port


@pytest.fixture(scope="session", autouse=True)
def remove_postgres_container():
if is_port_in_use(PYTEST_POSTGRES_PORT):
"""Remove any existing PostgreSQL container before tests start."""
try:
client = docker.from_env()
try:
client = docker.from_env()
container = client.containers.get(
PYTEST_POSTGRES_CONTAINER,
)
container.stop()
container.remove()
container = client.containers.get(PYTEST_POSTGRES_CONTAINER)
if container.status in ["running", "paused"]:
container.stop(timeout=10)
container.remove(force=True)
except docker.errors.NotFound:
pass # Container not found, nothing to do.
except docker.errors.DockerException:
pass # Docker not running, nothing to do.

for container in client.containers.list(all=True):
if container.name == PYTEST_POSTGRES_CONTAINER:
if container.status in ["running", "paused"]:
container.stop(timeout=10)
container.remove(force=True)

def test_pg_connection(host) -> bool:
pg_url = f"postgresql://postgres:postgres@{host}:{PYTEST_POSTGRES_PORT}/postgres"
except docker.errors.DockerException:
pass # Docker not running, nothing to do.


def test_pg_connection(host, port) -> bool:
pg_url = f"postgresql://postgres:postgres@{host}:{port}/postgres"

max_attempts = 120
for attempt in range(max_attempts):
Expand Down Expand Up @@ -177,10 +193,19 @@

try:
previous_container = client.containers.get(PYTEST_POSTGRES_CONTAINER)
previous_container.remove()
if previous_container.status in ["running", "paused"]:
previous_container.stop(timeout=10)
previous_container.remove(force=True)
except docker.errors.NotFound:
pass

postgres_port = PYTEST_POSTGRES_PORT
if is_port_in_use(postgres_port):
postgres_port = find_free_port()
logger.info(
f"Port {PYTEST_POSTGRES_PORT} is in use, using port {postgres_port} instead"
)

postgres_is_running = False
postgres = client.containers.run(
image=PYTEST_POSTGRES_IMAGE,
Expand All @@ -190,8 +215,9 @@
"POSTGRES_PASSWORD": "postgres",
"POSTGRES_DB": "postgres",
},
ports={"5432/tcp": PYTEST_POSTGRES_PORT},
ports={"5432/tcp": postgres_port},
detach=True,
remove=False,
)

attempts = 10
Expand All @@ -209,33 +235,34 @@

final_host = None
if host := os.environ.get("DOCKER_HOST_NAME"):
final_host = host if test_pg_connection(host) else None
final_host = host if test_pg_connection(host, postgres_port) else None
else:
# Try to connect to the database using localhost and the docker host IP
for host in ["127.0.0.1", "localhost", "host.docker.internal", "172.17.0.1"]:
if test_pg_connection(host):
if test_pg_connection(host, postgres_port):
final_host = host
break

if final_host is None:
raise Exception(f"Failed to connect to the PostgreSQL database on host {host}.")

yield final_host
yield (final_host, postgres_port)

# Stop and remove the container after the tests are done
postgres.stop()
postgres.remove()


@pytest.fixture(scope="function")
def new_postgres_cache(new_postgres_db: str):
def new_postgres_cache(new_postgres_db):
"""Fixture to return a fresh Postgres cache.
Each test that uses this fixture will get a unique table prefix.
"""
host, port = new_postgres_db
config = PostgresCache(
host=new_postgres_db,
port=PYTEST_POSTGRES_PORT,
host=host,
port=port,
username="postgres",
password="postgres",
database="postgres",
Expand Down
Loading