Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 23 additions & 71 deletions mcpgateway/services/resource_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@

# Third-Party
import httpx
from mcp import ClientSession, types
from mcp import ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamablehttp_client
from mcp.types import ReadResourceRequest, ReadResourceRequestParams
import parse
from pydantic import ValidationError
from sqlalchemy import and_, delete, desc, not_, or_, select
Expand All @@ -48,7 +47,6 @@
# First-Party
from mcpgateway.common.models import ResourceContent, ResourceContents, ResourceTemplate, TextContent
from mcpgateway.common.validators import SecurityValidator
from mcpgateway.common.validators import validate_meta_data as _validate_meta_data
from mcpgateway.config import settings
from mcpgateway.db import EmailTeam
from mcpgateway.db import EmailTeamMember as DbEmailTeamMember
Expand Down Expand Up @@ -113,53 +111,6 @@ def _get_registry_cache():
metrics_buffer = get_metrics_buffer_service()


def _build_read_resource_request(uri: Any, meta_data: Dict[str, Any]) -> "types.ClientRequest":
"""Build a ReadResource ClientRequest that carries _meta (CWE-20, CWE-284).

Using ``by_alias=True`` ensures the Pydantic alias ``_meta`` is the only
key written into the dict so the subsequent ``model_validate`` call
resolves it correctly regardless of ``populate_by_name`` settings.

``send_request`` is used instead of ``session.read_resource()`` because the
MCP SDK helper does not expose a ``_meta`` parameter; this wrapper must be
updated if the SDK later adds that capability.

Args:
uri: The resource URI.
meta_data: Validated metadata dict to inject as ``_meta``.

Returns:
A :class:`types.ClientRequest` ready to be passed to ``session.send_request``.
"""
_rp_dict = ReadResourceRequestParams(uri=uri).model_dump(by_alias=True)
_rp_dict["_meta"] = meta_data
return types.ClientRequest(ReadResourceRequest(params=ReadResourceRequestParams.model_validate(_rp_dict)))


async def _read_resource_with_meta(session: "ClientSession", uri: Any, meta_data: Optional[Dict[str, Any]]) -> Any:
"""Dispatch a read_resource call, injecting ``_meta`` when meta_data is provided.

Eliminates the repeated ``if meta_data: send_request … else: read_resource``
pattern across every transport/pool branch in this module.

Args:
session: An active MCP :class:`ClientSession`.
uri: The resource URI to read.
meta_data: Optional validated metadata dict. When ``None`` the standard
SDK helper is used; when non-empty the low-level ``send_request``
path is taken to carry ``_meta``.

Returns:
The raw MCP result object (caller extracts ``.contents``).
"""
if meta_data:
return await session.send_request(
_build_read_resource_request(uri, meta_data),
types.ReadResourceResult,
)
return await session.read_resource(uri=uri)


class ResourceError(Exception):
"""Base class for resource-related errors."""

Expand Down Expand Up @@ -1570,7 +1521,7 @@ async def invoke_resource( # pylint: disable=unused-argument
resource_uri: str,
resource_template_uri: Optional[str] = None,
user_identity: Optional[Union[str, Dict[str, Any]]] = None,
meta_data: Optional[Dict[str, Any]] = None, # Forwarded as _meta in upstream MCP requests
meta_data: Optional[Dict[str, Any]] = None, # Reserved for future MCP SDK support
resource_obj: Optional[Any] = None,
gateway_obj: Optional[Any] = None,
server_id: Optional[str] = None,
Expand Down Expand Up @@ -1684,10 +1635,6 @@ async def invoke_resource( # pylint: disable=unused-argument
'using template: /template'

"""
# CWE-400: Validate meta_data limits before any further processing; invoke_resource is
# a separate entry point that must enforce the same guards as read_resource.
_validate_meta_data(meta_data)

uri = None
if resource_uri and resource_template_uri:
uri = resource_template_uri
Expand Down Expand Up @@ -1752,7 +1699,7 @@ async def invoke_resource( # pylint: disable=unused-argument
try:
db_span_id = observability_service.start_span(
trace_id=trace_id,
name="invoke.resource",
name="resource.read",
attributes={
"resource.name": resource_name if resource_name else "unknown",
"resource.id": str(resource_id) if resource_id else "unknown",
Expand All @@ -1773,10 +1720,10 @@ async def invoke_resource( # pylint: disable=unused-argument
"gateway.transport": getattr(gateway, "transport") or "uknown",
"gateway.url": getattr(gateway, "url") or "unknown",
}
if is_input_capture_enabled("invoke.resource"):
if is_input_capture_enabled("resource.read"):
span_attributes["langfuse.observation.input"] = serialize_trace_payload({"uri": str(uri) if uri else "unknown"})

with create_span("invoke.resource", span_attributes) as span:
with create_span("resource.read", span_attributes) as span:
valid = False
if gateway.ca_certificate:
if settings.enable_ed25519_signing:
Expand Down Expand Up @@ -1922,8 +1869,8 @@ async def connect_to_sse_session(server_url: str, uri: str, authentication: Opti
``None`` instead of raising.

Note:
When meta_data is provided, the request is built using send_request
with _meta injected into ReadResourceRequestParams.
MCP SDK 1.25.0 read_resource() does not support meta parameter.
When the SDK adds support, meta_data can be added back here.

Args:
server_url (str):
Expand Down Expand Up @@ -1970,7 +1917,8 @@ async def connect_to_sse_session(server_url: str, uri: str, authentication: Opti
user_identity=pool_user_identity,
gateway_id=gateway_id,
) as pooled:
resource_response = await _read_resource_with_meta(pooled.session, uri, meta_data)
# Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
resource_response = await pooled.session.read_resource(uri=uri)
return getattr(getattr(resource_response, "contents")[0], "text")
else:
# Fallback to per-call sessions when pool disabled or not initialized
Expand All @@ -1980,7 +1928,8 @@ async def connect_to_sse_session(server_url: str, uri: str, authentication: Opti
):
async with ClientSession(read_stream, write_stream) as session:
_ = await session.initialize()
resource_response = await _read_resource_with_meta(session, uri, meta_data)
# Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
resource_response = await session.read_resource(uri=uri)
return getattr(getattr(resource_response, "contents")[0], "text")
except Exception as e:
# Sanitize error message to prevent URL secrets from leaking in logs
Expand All @@ -2002,8 +1951,8 @@ async def connect_to_streamablehttp_server(server_url: str, uri: str, authentica
of propagating the exception.

Note:
When meta_data is provided, the request is built using send_request
with _meta injected into ReadResourceRequestParams.
MCP SDK 1.25.0 read_resource() does not support meta parameter.
When the SDK adds support, meta_data can be added back here.

Args:
server_url (str):
Expand Down Expand Up @@ -2049,7 +1998,8 @@ async def connect_to_streamablehttp_server(server_url: str, uri: str, authentica
user_identity=pool_user_identity,
gateway_id=gateway_id,
) as pooled:
resource_response = await _read_resource_with_meta(pooled.session, uri, meta_data)
# Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
resource_response = await pooled.session.read_resource(uri=uri)
return getattr(getattr(resource_response, "contents")[0], "text")
else:
# Fallback to per-call sessions when pool disabled or not initialized
Expand All @@ -2060,7 +2010,8 @@ async def connect_to_streamablehttp_server(server_url: str, uri: str, authentica
):
async with ClientSession(read_stream, write_stream) as session:
_ = await session.initialize()
resource_response = await _read_resource_with_meta(session, uri, meta_data)
# Note: MCP SDK 1.25.0 read_resource() does not support meta parameter
resource_response = await session.read_resource(uri=uri)
return getattr(getattr(resource_response, "contents")[0], "text")
except Exception as e:
# Sanitize error message to prevent URL secrets from leaking in logs
Expand All @@ -2074,10 +2025,12 @@ async def connect_to_streamablehttp_server(server_url: str, uri: str, authentica

resource_text = ""
if (gateway_transport).lower() == "sse":
# Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it
resource_text = await connect_to_sse_session(server_url=gateway_url, authentication=headers, uri=uri)
else:
# Note: meta_data not passed - MCP SDK 1.25.0 read_resource() doesn't support it
resource_text = await connect_to_streamablehttp_server(server_url=gateway_url, authentication=headers, uri=uri)
if span and resource_text is not None and is_output_capture_enabled("invoke.resource"):
if span and resource_text is not None and is_output_capture_enabled("resource.read"):
set_span_attribute(span, "langfuse.observation.output", serialize_trace_payload({"content": resource_text}))
success = True # Mark as successful before returning
return resource_text
Expand All @@ -2099,7 +2052,7 @@ async def connect_to_streamablehttp_server(server_url: str, uri: str, authentica
status_message=error_message if error_message else None,
)
db_span_ended = True
logger.debug(f"βœ“ Ended invoke.resource span: {db_span_id}")
logger.debug(f"βœ“ Ended resource.read span: {db_span_id}")
except Exception as e:
logger.warning(f"Failed to end observability span for invoking resource: {e}")

Expand Down Expand Up @@ -2187,8 +2140,6 @@ async def read_resource(
resource_db = None
server_scoped = False
resource_db_gateway = None # Only set when eager-loaded via Q2's joinedload
# CWE-400: Validate meta_data limits before any further processing
_validate_meta_data(meta_data)
content = None
uri = resource_uri or "unknown"
if resource_id:
Expand Down Expand Up @@ -2363,7 +2314,8 @@ async def read_resource(
async with ClientSession(read_stream, write_stream) as session:
await session.initialize()

result = await _read_resource_with_meta(session, uri, meta_data)
# Note: MCP SDK read_resource() only accepts uri; _meta is not supported
result = await session.read_resource(uri=uri)

# Convert MCP result to MCP-compliant content models
# result.contents is a list of TextResourceContents or BlobResourceContents
Expand Down