Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions mcpgateway/services/mcp_session_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -1669,12 +1669,27 @@ async def _execute_forwarded_request(self, request: Dict[str, Any]) -> Dict[str,
timeout=settings.mcpgateway_pool_rpc_forward_timeout,
)

# Treat non-2xx HTTP responses as errors
# Gate on HTTP status first: non-2xx responses are errors
# even if the body parses as JSON.
if not response.is_success:
try:
response_data = response.json()
except ValueError:
response_data = {}
if not isinstance(response_data, dict):
response_data = {}

# If body is a JSON-RPC error ({"error": {...}}), propagate it
if "error" in response_data and isinstance(response_data["error"], dict):
logger.info(f"[AFFINITY] Worker {WORKER_ID} | Session {session_short}... | Method: {method} | Forwarded execution completed with error (HTTP {response.status_code})")
return {"error": response_data["error"]}

# Non-JSON-RPC error body (e.g. {"detail": "..."}): map to JSON-RPC error
detail = response_data.get("detail", response.text[:200] or "Unknown error")
logger.info(f"[AFFINITY] Worker {WORKER_ID} | Session {session_short}... | Method: {method} | Forwarded execution failed with HTTP {response.status_code}")
return {"error": {"code": -32603, "message": f"Internal request failed with HTTP {response.status_code}"}}
return {"error": {"code": -32603, "message": f"Forwarded request failed (HTTP {response.status_code}): {detail}"}}

# Parse response
# Parse successful response
response_data = response.json()

# Extract result or error from JSON-RPC response
Expand Down
69 changes: 69 additions & 0 deletions tests/e2e/test_session_pool_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,75 @@ async def test_execute_forwarded_request_returns_error_when_no_server(self):
finally:
await pool.close_all()

@pytest.mark.asyncio
async def test_execute_forwarded_request_returns_error_on_non_2xx_non_jsonrpc(self):
"""Verify _execute_forwarded_request maps non-2xx non-JSON-RPC responses to error."""
pool = MCPSessionPool()

try:
# Mock httpx to return 401 with non-JSON-RPC body
mock_response = MagicMock()
mock_response.status_code = 401
mock_response.is_success = False
mock_response.json.return_value = {"detail": "Authorization token required"}
mock_response.text = '{"detail": "Authorization token required"}'

mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)

with patch("mcpgateway.services.mcp_session_pool.httpx.AsyncClient", return_value=mock_client):
result = await pool._execute_forwarded_request(
{
"method": "tools/call",
"params": {},
"headers": {},
}
)

assert "error" in result
assert result["error"]["code"] == -32603
assert "401" in result["error"]["message"]
finally:
await pool.close_all()

@pytest.mark.asyncio
async def test_execute_forwarded_request_propagates_jsonrpc_error_on_non_2xx(self):
"""Verify _execute_forwarded_request propagates JSON-RPC errors from non-2xx responses."""
pool = MCPSessionPool()

try:
# Mock httpx to return 403 with JSON-RPC error body
mock_response = MagicMock()
mock_response.status_code = 403
mock_response.is_success = False
mock_response.json.return_value = {
"jsonrpc": "2.0",
"error": {"code": -32003, "message": "Token not authorized for server: abc"},
"id": 1,
}

mock_client = AsyncMock()
mock_client.post = AsyncMock(return_value=mock_response)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)

with patch("mcpgateway.services.mcp_session_pool.httpx.AsyncClient", return_value=mock_client):
result = await pool._execute_forwarded_request(
{
"method": "tools/call",
"params": {},
"headers": {},
}
)

assert "error" in result
assert result["error"]["code"] == -32003
assert "Token not authorized" in result["error"]["message"]
finally:
await pool.close_all()

@pytest.mark.asyncio
async def test_start_rpc_listener_returns_when_affinity_disabled(self):
"""Verify start_rpc_listener returns immediately when affinity disabled."""
Expand Down
120 changes: 120 additions & 0 deletions tests/unit/mcpgateway/services/test_mcp_session_pool_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,126 @@ async def post(self, *_args, **_kwargs):
assert result["error"]["code"] == -32603
assert result["error"]["message"] == "Internal request timeout"

@pytest.mark.asyncio
async def test_execute_forwarded_request_non_2xx_jsonrpc_error(self):
"""Non-2xx with JSON-RPC error body should propagate the error."""
pool = MCPSessionPool()

class DummyResponse:
def __init__(self):
self.status_code = 403
self.is_success = False
def json(self):
return {"error": {"code": -32003, "message": "Token not authorized"}}

class DummyClient:
async def __aenter__(self):
return self
async def __aexit__(self, *_exc):
return False
async def post(self, *_args, **_kwargs):
return DummyResponse()

with patch("mcpgateway.services.mcp_session_pool.settings") as mock_settings:
mock_settings.port = 4444
mock_settings.mcpgateway_pool_rpc_forward_timeout = 1.0
with patch("mcpgateway.services.mcp_session_pool.httpx.AsyncClient", return_value=DummyClient()):
result = await pool._execute_forwarded_request({"method": "tools/call", "params": {}, "headers": {}, "req_id": 1, "mcp_session_id": "sess-123"})

assert result == {"error": {"code": -32003, "message": "Token not authorized"}}

@pytest.mark.asyncio
async def test_execute_forwarded_request_non_2xx_non_jsonrpc(self):
"""Non-2xx with non-JSON-RPC body should map to a JSON-RPC error."""
pool = MCPSessionPool()

class DummyResponse:
def __init__(self):
self.status_code = 401
self.is_success = False
self.text = '{"detail": "Authorization token required"}'
def json(self):
return {"detail": "Authorization token required"}

class DummyClient:
async def __aenter__(self):
return self
async def __aexit__(self, *_exc):
return False
async def post(self, *_args, **_kwargs):
return DummyResponse()

with patch("mcpgateway.services.mcp_session_pool.settings") as mock_settings:
mock_settings.port = 4444
mock_settings.mcpgateway_pool_rpc_forward_timeout = 1.0
with patch("mcpgateway.services.mcp_session_pool.httpx.AsyncClient", return_value=DummyClient()):
result = await pool._execute_forwarded_request({"method": "tools/call", "params": {}, "headers": {}, "req_id": 1, "mcp_session_id": "sess-123"})

assert result["error"]["code"] == -32603
assert "401" in result["error"]["message"]
assert "Authorization token required" in result["error"]["message"]

@pytest.mark.asyncio
async def test_execute_forwarded_request_non_2xx_unparseable_body(self):
"""Non-2xx with unparseable JSON body should fall back to response text."""
pool = MCPSessionPool()

class DummyResponse:
def __init__(self):
self.status_code = 502
self.is_success = False
self.text = "Bad Gateway"
def json(self):
raise ValueError("No JSON")

class DummyClient:
async def __aenter__(self):
return self
async def __aexit__(self, *_exc):
return False
async def post(self, *_args, **_kwargs):
return DummyResponse()

with patch("mcpgateway.services.mcp_session_pool.settings") as mock_settings:
mock_settings.port = 4444
mock_settings.mcpgateway_pool_rpc_forward_timeout = 1.0
with patch("mcpgateway.services.mcp_session_pool.httpx.AsyncClient", return_value=DummyClient()):
result = await pool._execute_forwarded_request({"method": "tools/call", "params": {}, "headers": {}, "req_id": 1, "mcp_session_id": "sess-123"})

assert result["error"]["code"] == -32603
assert "502" in result["error"]["message"]
assert "Bad Gateway" in result["error"]["message"]

@pytest.mark.asyncio
async def test_execute_forwarded_request_non_2xx_null_json_body(self):
"""Non-2xx with JSON null body should not crash (response_data becomes None)."""
pool = MCPSessionPool()

class DummyResponse:
def __init__(self):
self.status_code = 500
self.is_success = False
self.text = "null"
def json(self):
return None

class DummyClient:
async def __aenter__(self):
return self
async def __aexit__(self, *_exc):
return False
async def post(self, *_args, **_kwargs):
return DummyResponse()

with patch("mcpgateway.services.mcp_session_pool.settings") as mock_settings:
mock_settings.port = 4444
mock_settings.mcpgateway_pool_rpc_forward_timeout = 1.0
with patch("mcpgateway.services.mcp_session_pool.httpx.AsyncClient", return_value=DummyClient()):
result = await pool._execute_forwarded_request({"method": "tools/call", "params": {}, "headers": {}, "req_id": 1, "mcp_session_id": "sess-123"})

assert result["error"]["code"] == -32603
assert "500" in result["error"]["message"]


# ---------------------------------------------------------------------------
# Lines 1583-1664: _execute_forwarded_http_request
Expand Down
Loading