Skip to content
Merged
2 changes: 1 addition & 1 deletion .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"files": "^.secrets.baseline|package-lock.json|Cargo.lock|scripts/sign_image.sh|scripts/zap|sonar-project.properties|uv.lock|go.sum|mcpgateway/sri_hashes.json|^.secrets.baseline$",
"lines": null
},
"generated_at": "2026-04-16T16:00:33Z",
"generated_at": "2026-04-16T17:23:59Z",
"plugins_used": [
{
"name": "AWSKeyDetector"
Expand Down
132 changes: 89 additions & 43 deletions mcpgateway/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@
from mcpgateway.common.models import ListResourceTemplatesResult, LogLevel, Root
from mcpgateway.common.validators import SecurityValidator
from mcpgateway.config import settings
from mcpgateway.db import A2AAgent as DbA2AAgent, A2APushNotificationConfig, A2ATask as DbA2ATask, refresh_slugs_on_startup, SessionLocal
from mcpgateway.db import A2AAgent as DbA2AAgent
from mcpgateway.db import A2APushNotificationConfig
from mcpgateway.db import A2ATask as DbA2ATask
from mcpgateway.db import refresh_slugs_on_startup, SessionLocal
from mcpgateway.db import Tool as DbTool
from mcpgateway.handlers.sampling import SamplingHandler
from mcpgateway.middleware.compression import SSEAwareCompressMiddleware
Expand Down Expand Up @@ -111,8 +114,8 @@
from mcpgateway.schemas import (
A2AAgentCreate,
A2AAgentRead,
A2APushNotificationConfigCreate,
A2AAgentUpdate,
A2APushNotificationConfigCreate,
CursorPaginatedA2AAgentsResponse,
CursorPaginatedGatewaysResponse,
CursorPaginatedPromptsResponse,
Expand All @@ -123,6 +126,8 @@
GatewayRead,
GatewayRefreshResponse,
GatewayUpdate,
HealthCheckResponse,
HealthStatusItem,
JsonPathModifier,
MetricsResponse,
PromptCreate,
Expand All @@ -143,8 +148,8 @@
ToolRead,
ToolUpdate,
)
from mcpgateway.services.a2a_service import A2AAgentError, A2AAgentNameConflictError, A2AAgentNotFoundError, A2AAgentService
from mcpgateway.services.a2a_server_service import A2AServerService
from mcpgateway.services.a2a_service import A2AAgentError, A2AAgentNameConflictError, A2AAgentNotFoundError, A2AAgentService
from mcpgateway.services.cancellation_service import cancellation_service
from mcpgateway.services.completion_service import CompletionService
from mcpgateway.services.content_security import ContentSizeError, ContentTypeError
Expand Down Expand Up @@ -178,7 +183,7 @@
from mcpgateway.utils.orjson_response import ORJSONResponse
from mcpgateway.utils.passthrough_headers import set_global_passthrough_headers
from mcpgateway.utils.paths import resolve_root_path
from mcpgateway.utils.redis_client import close_redis_client, get_redis_client
from mcpgateway.utils.redis_client import close_redis_client, get_redis_client, is_redis_available
from mcpgateway.utils.redis_isready import wait_for_redis_ready
from mcpgateway.utils.retry_manager import ResilientHttpClient
from mcpgateway.utils.token_scoping import validate_server_access
Expand Down Expand Up @@ -11076,56 +11081,97 @@ def healthcheck(response: Response = None):
db.close()


@app.get("/ready")
async def readiness_check():
def _check_db_ready() -> tuple[bool, str | None]:
"""
Check database connectivity in a thread-safe manner.

Returns:
tuple: (success: bool, error_message: str | None)
"""
db = SessionLocal()
try:
db.execute(text("SELECT 1"))
# Explicitly commit to release PgBouncer backend connection in transaction mode.
db.commit()
return (True, None)
except Exception as e:
# Rollback, then invalidate if rollback fails (mirrors get_db cleanup).
try:
db.rollback()
except Exception:
try:
db.invalidate()
except Exception:
pass # nosec B110 - Best effort cleanup on connection failure
return (False, str(e))
finally:
db.close()


@app.get("/ready", response_model=HealthCheckResponse)
async def readiness_check(response: Response):
"""
Perform a readiness check to verify if the application is ready to receive traffic.
Perform a comprehensive readiness check to verify all dependencies.

Creates and manages its own session inside the worker thread to ensure all DB
operations (create, execute, commit, rollback, close) happen in the same thread.
This avoids cross-thread session issues and double-commit from get_db.
This endpoint checks:
- Database connectivity (via asyncio.to_thread to avoid blocking)
- Cache availability (if enabled)

Returns HTTP 200 when ready, HTTP 503 when not ready.

Args:
response: Response object used to attach runtime-mode headers and status code.

Returns:
JSONResponse with status 200 if ready, 503 if not.
A HealthCheckResponse with detailed component health status.
HTTP 200 if all components are healthy (ready).
HTTP 503 if any component is unhealthy (not ready).
"""
status_items = []

def _check_db() -> str | None:
"""Check database connectivity by executing a simple query.
# Database health check (run in thread to avoid blocking event loop)
db_success, db_error = await asyncio.to_thread(_check_db_ready)

Returns:
None if successful, error message string if failed.
"""
# Create session in this thread - all DB operations stay in the same thread.
db = SessionLocal()
if db_success:
status_items.append(HealthStatusItem(name="Database", status_code=status.HTTP_200_OK, message="Database Connection Successful"))
else:
error_message = f"Database health check failed: {db_error}"
logger.error(error_message)
status_items.append(HealthStatusItem(name="Database", status_code=status.HTTP_503_SERVICE_UNAVAILABLE, message="Cannot connect to Database"))

# Check Redis health only if it's enabled (cache_type is redis and redis_url is configured)
redis_enabled = settings.cache_type == "redis" and settings.redis_url
if redis_enabled:
try:
db.execute(text("SELECT 1"))
# Explicitly commit to release PgBouncer backend connection.
db.commit()
return None # Success
# is_redis_available() checks if Redis is available and responding to ping.
if await is_redis_available():
status_items.append(HealthStatusItem(name="Cache", status_code=status.HTTP_200_OK, message="Cache Connection Successful"))
else:
status_items.append(HealthStatusItem(name="Cache", status_code=status.HTTP_503_SERVICE_UNAVAILABLE, message="Cannot connect to Cache"))
except Exception as e:
# Rollback, then invalidate if rollback fails (mirrors get_db cleanup).
try:
db.rollback()
except Exception:
try:
db.invalidate()
except Exception:
pass # nosec B110 - Best effort cleanup on connection failure
return str(e)
finally:
db.close()
logger.error(f"Redis health check failed: {str(e)}")
status_items.append(HealthStatusItem(name="Cache", status_code=status.HTTP_503_SERVICE_UNAVAILABLE, message="Cannot connect to Cache"))

# Determine overall status:
# - "ready" if Database is healthy (200) AND Redis is healthy when enabled
# - "unready" if Database is unhealthy (503) OR Redis is unhealthy when enabled
database_status = next((item for item in status_items if item.name == "Database"), None)
redis_status = next((item for item in status_items if item.name == "Cache"), None)

# Check database health
database_healthy = database_status and database_status.status_code == 200

# Redis is healthy if: not enabled OR (enabled AND status is 200)
redis_healthy = not redis_enabled or (redis_status and redis_status.status_code == 200)

is_ready = database_healthy and redis_healthy
overall_status = "ready" if is_ready else "unready"

# Set HTTP status code: 200 for ready, 503 for unready
response.status_code = status.HTTP_200_OK if is_ready else status.HTTP_503_SERVICE_UNAVAILABLE

# Run the blocking DB check in a thread to avoid blocking the event loop.
error = await asyncio.to_thread(_check_db)
if error:
error_message = f"Readiness check failed: {error}"
logger.error(error_message)
response = ORJSONResponse(content={"status": "not ready", "error": error_message, "mcp_runtime": _mcp_runtime_status_payload()}, status_code=503)
_apply_runtime_mode_headers(response)
return response
response = ORJSONResponse(content={"status": "ready", "mcp_runtime": _mcp_runtime_status_payload()}, status_code=200)
_apply_runtime_mode_headers(response)
return response
return HealthCheckResponse(status=overall_status, status_items=status_items, mcp_runtime=_mcp_runtime_status_payload())


@app.get("/health/security", tags=["health"])
Expand Down
16 changes: 16 additions & 0 deletions mcpgateway/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -8050,6 +8050,22 @@ class CacheMetricsSchema(BaseModel):
keyspace_misses: int = Field(0, description="Failed key lookups")


class HealthStatusItem(BaseModel):
"""Individual health status item for a service component."""

name: str = Field(..., description="Component name (e.g., 'Database', 'Cache')")
status_code: int = Field(..., description="HTTP status code (200 for healthy, 503 for unhealthy)")
message: str = Field(..., description="Status message describing the component state")


class HealthCheckResponse(BaseModel):
"""Health check response containing status of all monitored components."""

status: str = Field(..., description="Overall health status: 'healthy' if all components are healthy, 'unhealthy' otherwise")
status_items: List[HealthStatusItem] = Field(..., description="List of component health statuses")
mcp_runtime: Dict[str, Any] = Field(default_factory=dict, description="MCP runtime diagnostics and configuration")


class GunicornMetricsSchema(BaseModel):
"""Gunicorn server metrics."""

Expand Down
22 changes: 13 additions & 9 deletions tests/unit/mcpgateway/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,10 +482,12 @@ def test_ready_check(self, test_client):
assert response.status_code == 200
assert response.json()["status"] == "ready"

def test_health_check_db_error(self):
@pytest.mark.asyncio
async def test_health_check_db_error(self):
"""Test health check error path with rollback failure."""
# First-Party
from mcpgateway import main as mcpgateway_main
from starlette.responses import Response as FastAPIResponse

class DummySession:
def __init__(self):
Expand All @@ -508,15 +510,18 @@ def close(self):

session = DummySession()
with patch("mcpgateway.main.SessionLocal", return_value=session):
response = mcpgateway_main.healthcheck()
assert response["status"] == "unhealthy"
response_obj = FastAPIResponse()
result = mcpgateway_main.healthcheck(response_obj)
assert result["status"] == "unhealthy"
assert "error" in result
assert session.invalidate_called is True

@pytest.mark.asyncio
async def test_ready_check_db_error(self):
"""Test readiness check error path with rollback failure."""
# First-Party
from mcpgateway import main as mcpgateway_main
from starlette.responses import Response as FastAPIResponse

class DummySession:
def __init__(self):
Expand All @@ -538,12 +543,11 @@ def close(self):
pass

session = DummySession()
with (
patch("mcpgateway.main.SessionLocal", return_value=session),
patch("mcpgateway.main.asyncio.to_thread", side_effect=lambda fn, *args, **kwargs: fn(*args, **kwargs)),
):
response = await mcpgateway_main.readiness_check()
assert response.status_code == 503
with patch("mcpgateway.main.SessionLocal", return_value=session):
response_obj = FastAPIResponse()
result = await mcpgateway_main.readiness_check(response_obj)
assert result.status == "unready"
assert response_obj.status_code == 503
assert session.invalidate_called is True

def test_root_redirect(self, test_client):
Expand Down
Loading
Loading