Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 44 additions & 47 deletions mcpgateway/transports/streamablehttp_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
# First-Party
from mcpgateway.cache.global_config_cache import global_config_cache
from mcpgateway.common.models import LogLevel
from mcpgateway.common.validators import validate_meta_data as _validate_meta_data
from mcpgateway.config import settings
from mcpgateway.db import SessionLocal
from mcpgateway.middleware.rbac import _ACCESS_DENIED_MSG
Expand Down Expand Up @@ -1158,22 +1157,6 @@ async def _validate_streamable_session_access(
return False, HTTP_403_FORBIDDEN, "Session owner metadata unavailable"


def _build_paginated_params(meta: Optional[Any]) -> Optional[PaginatedRequestParams]:
"""Build a ``PaginatedRequestParams`` carrying ``_meta`` when provided.

Args:
meta: Request metadata (_meta) from the original MCP request, or ``None``.

Returns:
A ``PaginatedRequestParams`` instance with ``_meta`` set, or ``None`` when *meta* is falsy.
"""
if not meta:
return None
# CWE-532: log only key names, never values which may carry PII/tokens
logger.debug("Forwarding _meta to remote gateway (keys: %s)", sorted(meta.keys()) if isinstance(meta, dict) else type(meta).__name__)
return PaginatedRequestParams(_meta=meta)


async def _proxy_list_tools_to_gateway(gateway: Any, request_headers: dict, user_context: dict, meta: Optional[Any] = None) -> List[types.Tool]: # pylint: disable=unused-argument
"""Proxy tools/list request directly to remote MCP gateway using MCP SDK.

Expand Down Expand Up @@ -1211,8 +1194,14 @@ async def _proxy_list_tools_to_gateway(gateway: Any, request_headers: dict, user
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

# Prepare params with _meta if provided
params = None
if meta:
params = PaginatedRequestParams(_meta=meta)
logger.debug("Forwarding _meta to remote gateway: %s", meta)

# List tools with _meta forwarded
result = await session.list_tools(params=_build_paginated_params(meta))
result = await session.list_tools(params=params)
return result.tools

except Exception as e:
Expand Down Expand Up @@ -1254,16 +1243,21 @@ async def _proxy_list_resources_to_gateway(gateway: Any, request_headers: dict,

logger.info("Proxying resources/list to gateway %s at %s", gateway.id, gateway.url)
if meta:
# CWE-532: log only key names, never values which may carry PII/tokens
logger.debug("Forwarding _meta to remote gateway (keys: %s)", sorted(meta.keys()) if isinstance(meta, dict) else type(meta).__name__)
logger.debug("Forwarding _meta to remote gateway: %s", meta)

# Use MCP SDK to connect and list resources
async with streamablehttp_client(url=gateway.url, headers=headers, timeout=settings.mcpgateway_direct_proxy_timeout) as (read_stream, write_stream, _get_session_id):
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

# Prepare params with _meta if provided
params = None
if meta:
params = PaginatedRequestParams(_meta=meta)
logger.debug("Forwarding _meta to remote gateway: %s", meta)

# List resources with _meta forwarded
result = await session.list_resources(params=_build_paginated_params(meta))
result = await session.list_resources(params=params)

logger.info("Received %s resources from gateway %s", len(result.resources), gateway.id)
return result.resources
Expand Down Expand Up @@ -1315,8 +1309,7 @@ async def _proxy_read_resource_to_gateway(gateway: Any, resource_uri: str, user_

logger.info("Proxying resources/read for %s to gateway %s at %s", resource_uri, gateway.id, gateway.url)
if meta:
# CWE-532: log only key names, never values which may carry PII/tokens
logger.debug("Forwarding _meta to remote gateway (keys: %s)", sorted(meta.keys()) if isinstance(meta, dict) else type(meta).__name__)
logger.debug("Forwarding _meta to remote gateway: %s", meta)

# Use MCP SDK to connect and read resource
async with streamablehttp_client(url=gateway.url, headers=headers, timeout=settings.mcpgateway_direct_proxy_timeout) as (read_stream, write_stream, _get_session_id):
Expand All @@ -1326,10 +1319,8 @@ async def _proxy_read_resource_to_gateway(gateway: Any, resource_uri: str, user_
# Prepare request params with _meta if provided
if meta:
# Create params and inject _meta
# by_alias=True ensures the alias "_meta" key is written so
# model_validate resolves it correctly (fixes CWE-20 silent drop)
request_params = ReadResourceRequestParams(uri=resource_uri)
request_params_dict = request_params.model_dump(by_alias=True)
request_params_dict = request_params.model_dump()
request_params_dict["_meta"] = meta

# Send request with _meta
Expand Down Expand Up @@ -2430,20 +2421,23 @@ async def read_resource(resource_uri: str) -> Union[str, bytes]:
return ""

# Direct proxy mode: forward request to remote MCP server
# SECURITY: CWE-532 protection - Log only meta_data key names, NEVER values
# Metadata may contain PII, authentication tokens, or sensitive context that
# MUST NOT be written to logs. This is a critical security control.
logger.debug(
"Using direct_proxy mode for resources/read %s, server %s, gateway %s (from %s header), forwarding _meta keys: %s",
resource_uri,
server_id,
gateway.id,
GATEWAY_ID_HEADER,
sorted(meta_data.keys()) if meta_data else None,
)
# CWE-400: validate _meta limits before network I/O (bypassed in direct-proxy branch)
_validate_meta_data(meta_data)
contents = await _proxy_read_resource_to_gateway(gateway, str(resource_uri), user_context, meta_data)
# Get _meta from request context if available
meta = None
try:
request_ctx = mcp_app.request_context
meta = request_ctx.meta
logger.info(
"Using direct_proxy mode for resources/read %s, server %s, gateway %s (from %s header), forwarding _meta: %s",
resource_uri,
server_id,
gateway.id,
GATEWAY_ID_HEADER,
meta,
)
except (LookupError, AttributeError) as e:
logger.debug("No request context available for _meta extraction: %s", e)

contents = await _proxy_read_resource_to_gateway(gateway, str(resource_uri), user_context, meta)
if contents:
# Return first content (text or blob)
first_content = contents[0]
Expand Down Expand Up @@ -3564,15 +3558,14 @@ async def _auth_no_token(self, *, path: str, bearer_header_supplied: bool) -> bo
Returns:
True if the request is allowed with public-only access, False if rejected.
"""
# If client supplied a Bearer header but with empty credentials, fail closed
if bearer_header_supplied:
return await self._send_error(detail="Invalid authentication credentials", headers={"WWW-Authenticate": "Bearer"})

# Build the WWW-Authenticate header, enriching it with RFC 9728
# resource_metadata when the target server has OAuth enabled.
# Per-server OAuth enforcement MUST run before the global auth check so that
# oauth_enabled servers always return 401 with resource_metadata URL (RFC 9728).
# Without this, strict mode (mcp_require_auth=True) returns a generic
# WWW-Authenticate: Bearer with no resource_metadata, and MCP clients cannot
# discover the OAuth server to authenticate. (Fixes #3752)
# discover the OAuth server to authenticate.
www_auth = "Bearer"
match = _SERVER_ID_RE.search(path)
if match:
per_server_id = match.group("server_id")
Expand All @@ -3586,9 +3579,13 @@ async def _auth_no_token(self, *, path: str, bearer_header_supplied: bool) -> bo
logger.exception("OAuth enforcement check failed for server %s", per_server_id)
return await self._send_error(detail="Service unavailable β€” unable to verify server authentication requirements", status_code=503)

# Strict mode: require authentication (non-OAuth servers get generic 401)
# If client supplied a Bearer header but with empty credentials, fail closed
if bearer_header_supplied:
return await self._send_error(detail="Invalid authentication credentials", headers={"WWW-Authenticate": www_auth})

# Strict mode: require authentication
if settings.mcp_require_auth:
return await self._send_error(detail="Authentication required for MCP endpoints", headers={"WWW-Authenticate": "Bearer"})
return await self._send_error(detail="Authentication required for MCP endpoints", headers={"WWW-Authenticate": www_auth})

# Permissive mode: allow unauthenticated access with public-only scope
# Set context indicating unauthenticated user with public-only access (teams=[])
Expand Down