MCP Events: High-level server API with @mcp.event() and emit_event()#1
MCP Events: High-level server API with @mcp.event() and emit_event()#1elijahr wants to merge 24 commits into
Conversation
…gistry Provides declare_event(), @mcp.event() decorator, and emit_event() on both FastMCP instances and Context objects. Includes session registry for broadcast delivery, subscription/unsubscribe/list handlers via _receive_loop interception, retained value store, parameterized topic pattern matching, and topic depth enforcement (max 8 segments). Also fixes pre-existing test timeout flakes in SSE and stdio client tests.
There was a problem hiding this comment.
Code Review
This pull request introduces an application-level event system to FastMCP, supporting MQTT-style wildcard subscriptions and retained events. Key changes include a new event registry, publishing capabilities in the Context and FastMCP classes, and JSON-RPC interception for event methods. Review feedback identifies a bug in the _receive_loop override that could cause runtime errors and suggests refactoring to avoid duplicating SDK logic. Further improvements are proposed to optimize session lookups using a dictionary and to deduplicate retained events during subscription.
| async for message in self._read_stream: | ||
| if isinstance(message, Exception): | ||
| await self._handle_incoming(message) | ||
| elif isinstance(message.message.root, JSONRPCRequest): | ||
| method = message.message.root.method | ||
| if method in self._EVENT_METHODS: | ||
| # Only handle events/* if the server has events capability | ||
| if self.fastmcp._event_topics: | ||
| await self._handle_event_request( | ||
| method=method, | ||
| params=message.message.root.params or {}, | ||
| request_id=message.message.root.id, | ||
| ) | ||
| else: | ||
| # No events capability: return -32601 Method not found | ||
| error_response = JSONRPCError( | ||
| jsonrpc="2.0", | ||
| id=message.message.root.id, | ||
| error=ErrorData( | ||
| code=-32601, | ||
| message=f"Method not found: {method}", | ||
| ), | ||
| ) | ||
| session_message = SessionMessage( | ||
| message=JSONRPCMessage(error_response) | ||
| ) | ||
| await self._write_stream.send(session_message) | ||
| else: | ||
| # Delegate to parent for standard SDK validation | ||
| await self._handle_sdk_request(message) | ||
| elif isinstance(message.message.root, mcp.types.JSONRPCNotification): | ||
| await self._handle_sdk_notification(message) | ||
| else: | ||
| await self._handle_response(message) |
There was a problem hiding this comment.
The _receive_loop override duplicates a significant amount of the SDK's internal message dispatch logic, which is fragile and has led to bugs in the duplicated code. Specifically, _handle_sdk_request and _handle_sdk_notification incorrectly call _handle_incoming with validated models or responders instead of the expected SessionMessage or Exception types, which will cause runtime errors.
Instead of duplicating the logic, you should intercept only the event-related methods and delegate all other messages to the base class's _handle_incoming method. This also makes the redundant _handle_sdk_request and _handle_sdk_notification methods unnecessary.
async for message in self._read_stream:
if isinstance(message, Exception):
await self._handle_incoming(message)
continue
root = message.message.root
if isinstance(root, JSONRPCRequest) and root.method in self._EVENT_METHODS:
# Only handle events/* if the server has events capability
if self.fastmcp._event_topics:
await self._handle_event_request(
method=root.method,
params=root.params or {},
request_id=root.id,
)
else:
# No events capability: return -32601 Method not found
error_response = JSONRPCError(
jsonrpc="2.0",
id=root.id,
error=ErrorData(
code=-32601,
message=f"Method not found: {root.method}",
),
)
session_message = SessionMessage(
message=JSONRPCMessage(error_response)
)
await self._write_stream.send(session_message)
else:
# Delegate to parent for standard SDK validation
await self._handle_incoming(message)| self._event_topics: dict[str, EventTopicDescriptor] = {} | ||
| self._subscription_registry: SubscriptionRegistry = SubscriptionRegistry() | ||
| self._retained_store: RetainedValueStore = RetainedValueStore() | ||
| self._active_sessions: set[MiddlewareServerSession] = set() |
There was a problem hiding this comment.
Consider changing _active_sessions to a dictionary mapping session IDs to session objects. This allows for efficient
| self._active_sessions: set[MiddlewareServerSession] = set() | |
| self._active_sessions: dict[str, MiddlewareServerSession] = {} |
|
|
||
| session_id = str(uuid4()) | ||
| session._fastmcp_event_session_id = session_id # type: ignore[attr-defined] | ||
| self.fastmcp._active_sessions.add(session) |
| ) | ||
| finally: | ||
| # Cleanup: remove session and its subscriptions | ||
| self.fastmcp._active_sessions.discard(session) |
| for session in list(self._active_sessions): | ||
| sid = getattr(session, "_fastmcp_event_session_id", None) | ||
| if sid is None or sid not in matching_session_ids: | ||
| continue |
There was a problem hiding this comment.
Broadcasting events by iterating over all active sessions and checking their IDs is inefficient ($O(N_{sessions})$). Since matching_session_ids already contains the IDs of sessions that should receive the event, it is much faster to look them up directly in the suggested _active_sessions dictionary ($O(N_{matches})$).
| for session in list(self._active_sessions): | |
| sid = getattr(session, "_fastmcp_event_session_id", None) | |
| if sid is None or sid not in matching_session_ids: | |
| continue | |
| # Broadcast to matching active sessions | |
| for sid in matching_session_ids: | |
| session = self._active_sessions.get(sid) | |
| if session: | |
| try: | |
| await session.send_notification(notification) | |
| except Exception: | |
| logger.warning( | |
| f"Failed to deliver event to session {sid}", | |
| exc_info=True, | |
| ) |
| retained_events = [] | ||
|
|
||
| for pattern in topics: | ||
| # Validate topic depth (max 8 segments) | ||
| segments = pattern.split("/") | ||
| if len(segments) > server._MAX_TOPIC_DEPTH: | ||
| raise McpError( | ||
| mcp.types.ErrorData( | ||
| code=-32602, | ||
| message=( | ||
| f"Subscription pattern has {len(segments)} segments, " | ||
| f"maximum depth is {server._MAX_TOPIC_DEPTH}: {pattern!r}" | ||
| ), | ||
| ) | ||
| ) | ||
|
|
||
| # Check if the pattern matches any declared topic | ||
| if not server._match_declared_topic(pattern): | ||
| rejected.append(RejectedTopic(pattern=pattern, reason="unknown_topic")) | ||
| continue | ||
|
|
||
| await server._subscription_registry.add(session_id, pattern) | ||
| subscribed.append(SubscribedTopic(pattern=pattern)) | ||
|
|
||
| # Deliver retained values for this pattern | ||
| matching = await server._retained_store.get_matching(pattern) | ||
| retained_events.extend(matching) | ||
|
|
||
| result = EventSubscribeResult( | ||
| subscribed=subscribed, | ||
| rejected=rejected, | ||
| retained=retained_events, | ||
| ) | ||
| return result.model_dump(exclude_none=True) |
There was a problem hiding this comment.
The retained_events list can contain duplicate events if a client subscribes to multiple overlapping patterns (e.g., myapp/status and myapp/#) in a single request. It is recommended to deduplicate these events by their unique event_id before returning them to the client.
retained_events: dict[str, RetainedEvent] = {}
for pattern in topics:
# Validate topic depth (max 8 segments)
segments = pattern.split("/")
if len(segments) > server._MAX_TOPIC_DEPTH:
raise McpError(
mcp.types.ErrorData(
code=-32602,
message=(
f"Subscription pattern has {len(segments)} segments, "
f"maximum depth is {server._MAX_TOPIC_DEPTH}: {pattern!r}"
),
)
)
# Check if the pattern matches any declared topic
if not server._match_declared_topic(pattern):
rejected.append(RejectedTopic(pattern=pattern, reason="unknown_topic"))
continue
await server._subscription_registry.add(session_id, pattern)
subscribed.append(SubscribedTopic(pattern=pattern))
# Deliver retained values for this pattern
matching = await server._retained_store.get_matching(pattern)
for event in matching:
retained_events[event.event_id] = event
result = EventSubscribeResult(
subscribed=subscribed,
rejected=rejected,
retained=list(retained_events.values()),
)
return result.model_dump(exclude_none=True)Imports all event types from axiomantic/python-sdk fork instead of self-containing duplicates. Removes the fragile _receive_loop override and registers event handlers through the standard SDK mechanism. Changes _active_sessions from set to dict for O(1) broadcast lookup. Adds retained event deduplication during subscription.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive event system to FastMCP, allowing servers to declare topics, publish events, and manage client subscriptions with MQTT-style wildcards. Key additions include a SubscriptionRegistry for session management, a RetainedValueStore for event persistence, and integration into the Context and FastMCP classes for easy event emission. Review feedback highlighted several areas for improvement: the mcp dependency should be updated to a stable release before production, type safety should be restored by replacing Any with specific types in protocol handlers and the responder parameter, and error handling should be enhanced by logging warnings instead of silently swallowing exceptions during expiry checks and JSON schema generation.
| "exceptiongroup>=1.2.2", | ||
| "httpx>=0.28.1,<1.0", | ||
| "mcp>=1.24.0,<2.0", | ||
| "mcp @ git+https://github.com/axiomantic/python-sdk.git@mcp-events", |
There was a problem hiding this comment.
The mcp dependency is pinned to a specific git branch. While this is acceptable for development, it's not suitable for production. Before this change is merged into a main or release branch, this should be updated to point to a released version of the mcp package on PyPI to ensure stable and reproducible builds.
| except (ValueError, TypeError): | ||
| return False |
There was a problem hiding this comment.
Catching ValueError and TypeError and returning False means that an invalid expires_at format is silently treated as "never expires". This can hide bugs. It would be better to log a warning when an invalid format is encountered so that developers are aware of the issue. You'll need to import a logger to do this.
| async def _received_request( | ||
| self, | ||
| responder: RequestResponder[mcp.types.ClientRequest, mcp.types.ServerResult], | ||
| responder, |
There was a problem hiding this comment.
The type hint for the responder parameter has been removed, which reduces type safety and code clarity. Please restore the type hint. The RequestResponder type might have moved in the new SDK version, so you may need to adjust the import.
| responder, | |
| responder: "RequestResponder[mcp.types.ClientRequest, mcp.types.ServerResult]", |
| self, req: Any | ||
| ) -> Any: |
There was a problem hiding this comment.
The type hints for req and the return value are Any. Using specific types from fastmcp.server.events like EventSubscribeRequest and EventSubscribeResult would improve type safety and readability. This also applies to _handle_unsubscribe_events and _handle_list_events.
| self, req: Any | |
| ) -> Any: | |
| self, req: "EventSubscribeRequest" | |
| ) -> "EventSubscribeResult": |
| except Exception: | ||
| pass |
There was a problem hiding this comment.
The broad except Exception: pass can silently swallow important errors during JSON schema generation, making debugging difficult. At a minimum, this should log a warning when an exception occurs, so developers are aware that schema generation failed.
except Exception as e:
logger.warning(
f"Failed to generate JSON schema for event topic '{pattern}': {e}",
exc_info=True,
)|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive event system for FastMCP, enabling servers to declare, emit, and manage subscriptions for events with support for MQTT-style wildcards and retained values. Key components include a new SubscriptionRegistry for session-to-topic mapping, a RetainedValueStore for replaying the latest events to new subscribers, and protocol-level handlers for events/subscribe, events/unsubscribe, and events/list. Feedback suggests improving the stability of the project by updating the git-pinned mcp dependency to a released version and enhancing the robustness of the event expiration logic by logging warnings for malformed timestamps instead of failing silently.
| "exceptiongroup>=1.2.2", | ||
| "httpx>=0.28.1,<1.0", | ||
| "mcp>=1.24.0,<2.0", | ||
| "mcp @ git+https://github.com/axiomantic/python-sdk.git@mcp-events", |
There was a problem hiding this comment.
| def _is_expired(self, topic: str) -> bool: | ||
| expires_at = self._expires.get(topic) | ||
| if expires_at is None: | ||
| return False | ||
| try: | ||
| expiry = datetime.fromisoformat(expires_at) | ||
| if expiry.tzinfo is None: | ||
| expiry = expiry.replace(tzinfo=timezone.utc) | ||
| return datetime.now(timezone.utc) >= expiry | ||
| except (ValueError, TypeError): | ||
| return False |
There was a problem hiding this comment.
The _is_expired method silently ignores invalid expires_at strings by returning False, effectively treating them as non-expiring. This could hide configuration errors or malformed data. It would be more robust to log a warning when the date string cannot be parsed.
To do this, you'll need to set up a logger at the module level, similar to other files in the project:
import logging
logger = logging.getLogger(__name__)Then, you can modify the method to log the warning.
| def _is_expired(self, topic: str) -> bool: | |
| expires_at = self._expires.get(topic) | |
| if expires_at is None: | |
| return False | |
| try: | |
| expiry = datetime.fromisoformat(expires_at) | |
| if expiry.tzinfo is None: | |
| expiry = expiry.replace(tzinfo=timezone.utc) | |
| return datetime.now(timezone.utc) >= expiry | |
| except (ValueError, TypeError): | |
| return False | |
| def _is_expired(self, topic: str) -> bool: | |
| expires_at = self._expires.get(topic) | |
| if expires_at is None: | |
| return False | |
| try: | |
| expiry = datetime.fromisoformat(expires_at) | |
| if expiry.tzinfo is None: | |
| expiry = expiry.replace(tzinfo=timezone.utc) | |
| return datetime.now(timezone.utc) >= expiry | |
| except (ValueError, TypeError): | |
| logger.warning("Invalid expires_at value %r for topic %r", expires_at, topic) | |
| return False |
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements a new event subscription and publishing system for FastMCP, introducing SubscriptionRegistry for MQTT-style topic matching and RetainedValueStore for retained events. The FastMCP server now supports event declaration, emission via ctx.emit_event, and handles event protocol requests (subscribe, unsubscribe, list). The mcp dependency is updated to a specific git branch, and python-ulid is added. Extensive tests for the new event features and server shutdown reliability are included. Feedback highlights the need to transition the mcp dependency to a stable PyPI release, address attribute naming inconsistencies in RetainedEvent and EventParams, improve datetime.fromisoformat compatibility, refine wildcard matching logic, optimize SubscriptionRegistry.match for efficiency, and implement concurrent event broadcasting to prevent head-of-line blocking.
| continue | ||
| if regex.match(topic): | ||
| # Deduplicate: overlapping patterns can match same retained value | ||
| eid = event.eventId |
There was a problem hiding this comment.
The RetainedEvent model from the MCP SDK uses snake_case for its attributes. Accessing event.eventId will likely raise an AttributeError. It should be event.event_id, which is consistent with the usage in the test suite (e.g., test_events.py line 138).
| eid = event.eventId | |
| eid = event.event_id |
| "exceptiongroup>=1.2.2", | ||
| "httpx>=0.28.1,<1.0", | ||
| "mcp>=1.24.0,<2.0", | ||
| "mcp @ git+https://github.com/axiomantic/python-sdk.git@mcp-events", |
There was a problem hiding this comment.
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method can be optimized to avoid redundant regex matching and dictionary lookups. Currently, it iterates over all sessions and all their patterns, performing a dict lookup and regex match for each. A more efficient approach is to find all unique patterns that match the topic first, and then find sessions subscribed to those patterns.
| async def match(self, topic: str) -> set[str]: | |
| """Return session IDs whose subscriptions match *topic*. | |
| Each session appears at most once (at-most-once delivery guarantee). | |
| """ | |
| async with self._lock: | |
| result: set[str] = set() | |
| for session_id, patterns in self._subscriptions.items(): | |
| for pattern in patterns: | |
| regex = self._compile(pattern) | |
| if regex.match(topic): | |
| result.add(session_id) | |
| break # at-most-once per session | |
| return result | |
| async def match(self, topic: str) -> set[str]: | |
| """Return session IDs whose subscriptions match *topic*. | |
| Each session appears at most once (at-most-once delivery guarantee). | |
| """ | |
| async with self._lock: | |
| # Find all unique patterns that match the topic | |
| matching_patterns = { | |
| pattern for pattern, regex in self._compiled.items() | |
| if regex.match(topic) | |
| } | |
| # Find sessions subscribed to any of those patterns | |
| result: set[str] = set() | |
| for session_id, patterns in self._subscriptions.items(): | |
| if not patterns.isdisjoint(matching_patterns): | |
| result.add(session_id) | |
| return result |
| if expires_at is None: | ||
| return False | ||
| try: | ||
| expiry = datetime.fromisoformat(expires_at) |
There was a problem hiding this comment.
datetime.fromisoformat() does not support the Z suffix for UTC in Python versions prior to 3.11. Since the MCP SDK supports Python 3.10, this will cause a ValueError when parsing ISO 8601 strings from clients that use the Z notation. Consider replacing Z with +00:00 for broader compatibility.
| expiry = datetime.fromisoformat(expires_at) | |
| expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) |
|
|
||
| # Replace MQTT wildcards with a synthetic literal segment for | ||
| # testing against the declared pattern regex. | ||
| test_pattern = _re.sub(r"[+#]", "x", pattern) |
There was a problem hiding this comment.
The substitution of + and # with x is too aggressive. In MQTT, these are only wildcards if they occupy an entire segment of the topic. Replacing them anywhere in the string can lead to incorrect matches (e.g., a literal + in a segment being treated as a wildcard match). The substitution should be segment-aware.
| test_pattern = _re.sub(r"[+#]", "x", pattern) | |
| test_parts = [ | |
| "x" if seg in ("+", "#") else seg | |
| for seg in pattern.split("/") | |
| ] | |
| test_pattern = "/".join(test_parts) |
| if retained: | ||
| retained_event = RetainedEvent( | ||
| topic=topic, | ||
| eventId=event_id, |
| eventId=event_id, | ||
| payload=payload, | ||
| retained=retained, | ||
| source=source, | ||
| correlationId=correlation_id, | ||
| requestedEffects=requested_effects, | ||
| expiresAt=expires_at, |
There was a problem hiding this comment.
It is recommended to use snake_case attribute names when initializing EventParams to ensure consistency with the rest of the codebase and the SDK's model definitions.
| eventId=event_id, | |
| payload=payload, | |
| retained=retained, | |
| source=source, | |
| correlationId=correlation_id, | |
| requestedEffects=requested_effects, | |
| expiresAt=expires_at, | |
| event_id=event_id, | |
| payload=payload, | |
| retained=retained, | |
| source=source, | |
| correlation_id=correlation_id, | |
| requested_effects=requested_effects, | |
| expires_at=expires_at, |
| for sid in matching_session_ids: | ||
| session = self._active_sessions.get(sid) | ||
| if session is None: | ||
| continue | ||
| try: | ||
| await session.send_notification( | ||
| cast(ServerNotification, notification) | ||
| ) | ||
| except Exception: | ||
| logger.warning( | ||
| f"Failed to deliver event to session {sid}", | ||
| exc_info=True, | ||
| ) |
There was a problem hiding this comment.
Broadcasting events sequentially can lead to head-of-line blocking. If one session's transport is slow or backpressured, it will delay delivery to all subsequent sessions. Using a TaskGroup to send notifications concurrently would improve performance and reliability of the broadcast.
# Broadcast to matching active sessions concurrently
import anyio
async with anyio.create_task_group() as tg:
for sid in matching_session_ids:
session = self._active_sessions.get(sid)
if session is None:
continue
async def send_safe(s=session, id=sid):
try:
await s.send_notification(
cast(ServerNotification, notification)
)
except Exception:
logger.warning(
f"Failed to deliver event to session {id}",
exc_info=True,
)
tg.start_soon(send_safe)|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces application-level event support for FastMCP, enabling servers to declare topics, publish events, and manage client subscriptions using MQTT-style wildcards. Key additions include a SubscriptionRegistry for session-to-topic mapping, a RetainedValueStore for replaying events to new subscribers, and integration with the Context class for emitting events from tools. The implementation also includes protocol-level handlers for subscribing, unsubscribing, and listing topics. Review feedback identifies missing imports for 'cast' and 'logger' in several modules, suggests removing redundant asyncio.Lock instances and deduplication logic to optimize performance, and recommends adding timestamps to retained events to provide better context for clients.
| from mcp.server.lowlevel.server import request_ctx | ||
|
|
||
| server = cast("FastMCP", self) | ||
| logger.debug(f"[{server.name}] Handler called: events/subscribe") |
| continue | ||
| try: | ||
| await session.send_notification( | ||
| cast(ServerNotification, notification) |
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| self._lock = asyncio.Lock() |
There was a problem hiding this comment.
The asyncio.Lock is redundant here because none of the methods in SubscriptionRegistry contain await points within their critical sections (except for the lock acquisition itself). In a single-threaded asyncio environment, tasks only yield at await points, so these operations are already atomic relative to other tasks. Removing the lock will reduce overhead and simplify the code.
| self._lock = asyncio.Lock() | |
| # self._lock = asyncio.Lock() # Redundant in single-threaded asyncio without await points |
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method performs an O(N*M) scan (sessions * patterns) for every event emission. While acceptable for small numbers of sessions, this will become a performance bottleneck as the server scales. Consider using a more efficient data structure for MQTT wildcard matching, such as a Topic Trie, or at least pre-filtering sessions by literal topic prefixes.
| """ | ||
|
|
||
| def __init__(self) -> None: | ||
| self._lock = asyncio.Lock() |
There was a problem hiding this comment.
Similar to SubscriptionRegistry, the asyncio.Lock in RetainedValueStore is redundant as there are no await points inside the methods that modify or read the store. Removing it will improve performance slightly and clarify that these operations are atomic in the current event loop.
| self._lock = asyncio.Lock() | |
| # self._lock = asyncio.Lock() # Redundant |
| seen_event_ids: set[str] = set() | ||
| expired_topics: list[str] = [] | ||
| for topic, event in self._store.items(): | ||
| if self._is_expired(topic): | ||
| expired_topics.append(topic) | ||
| continue | ||
| if regex.match(topic): | ||
| # Deduplicate: overlapping patterns can match same retained value | ||
| eid = event.eventId | ||
| if eid not in seen_event_ids: | ||
| seen_event_ids.add(eid) | ||
| result.append(event) |
There was a problem hiding this comment.
The deduplication logic using seen_event_ids inside get_matching is redundant. Since self._store is keyed by unique concrete topics, each iteration over self._store.items() will naturally yield unique events. Deduplication is only necessary when multiple patterns are processed in a single request, which is already handled in the protocol layer (mcp_operations.py).
| seen_event_ids: set[str] = set() | |
| expired_topics: list[str] = [] | |
| for topic, event in self._store.items(): | |
| if self._is_expired(topic): | |
| expired_topics.append(topic) | |
| continue | |
| if regex.match(topic): | |
| # Deduplicate: overlapping patterns can match same retained value | |
| eid = event.eventId | |
| if eid not in seen_event_ids: | |
| seen_event_ids.add(eid) | |
| result.append(event) | |
| for topic, event in self._store.items(): | |
| if self._is_expired(topic): | |
| expired_topics.append(topic) | |
| continue | |
| if regex.match(topic): | |
| result.append(event) |
| retained_event = RetainedEvent( | ||
| topic=topic, | ||
| eventId=event_id, | ||
| timestamp=None, |
There was a problem hiding this comment.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements a comprehensive Events system for FastMCP, enabling servers to push real-time notifications to clients via topic-based streams. It introduces mechanisms for declaring event topics using decorators or direct methods, emitting events from tools or background tasks, and supporting retained events for late subscribers. The implementation includes a subscription registry with MQTT-style wildcard support and protocol handlers for subscribing, unsubscribing, and listing events. Feedback suggests adding python-ulid as a direct dependency, removing redundant deduplication in the retained value store, and optimizing the topic matching logic to improve efficiency.
| "exceptiongroup>=1.2.2", | ||
| "httpx>=0.28.1,<1.0", | ||
| "mcp>=1.24.0,<2.0", | ||
| "mcp @ git+https://github.com/axiomantic/python-sdk.git@mcp-events", |
There was a problem hiding this comment.
The code in src/fastmcp/server/server.py now directly imports and uses ULID from the python-ulid package. This package should be added as a direct dependency in pyproject.toml to ensure it is available regardless of transitive dependency changes in the mcp SDK.
"mcp @ git+https://github.com/axiomantic/python-sdk.git@mcp-events",
"python-ulid>=3.1.0",
| seen_event_ids: set[str] = set() | ||
| expired_topics: list[str] = [] | ||
| for topic, event in self._store.items(): | ||
| if self._is_expired(topic): | ||
| expired_topics.append(topic) | ||
| continue | ||
| if regex.match(topic): | ||
| # Deduplicate: overlapping patterns can match same retained value | ||
| eid = event.eventId | ||
| if eid not in seen_event_ids: | ||
| seen_event_ids.add(eid) | ||
| result.append(event) |
There was a problem hiding this comment.
The deduplication logic using seen_event_ids inside get_matching is redundant. Since self._store is a dictionary keyed by topic, iterating over its items already guarantees that each unique topic (and its associated event) is visited exactly once. Deduplication is correctly handled at a higher level in _handle_subscribe_events when processing multiple subscription patterns.
| seen_event_ids: set[str] = set() | |
| expired_topics: list[str] = [] | |
| for topic, event in self._store.items(): | |
| if self._is_expired(topic): | |
| expired_topics.append(topic) | |
| continue | |
| if regex.match(topic): | |
| # Deduplicate: overlapping patterns can match same retained value | |
| eid = event.eventId | |
| if eid not in seen_event_ids: | |
| seen_event_ids.add(eid) | |
| result.append(event) | |
| result: list[RetainedEvent] = [] | |
| expired_topics: list[str] = [] | |
| for topic, event in self._store.items(): | |
| if self._is_expired(topic): | |
| expired_topics.append(topic) | |
| continue | |
| if regex.match(topic): | |
| result.append(event) |
| for declared_pattern in self._event_topics: | ||
| # Forward: build regex from declared pattern's {param} placeholders | ||
| # and test whether the subscription pattern (with wildcards replaced | ||
| # by a synthetic single-segment value) matches. | ||
| declared_regex_parts = [] | ||
| for segment in declared_pattern.split("/"): | ||
| if segment.startswith("{") and segment.endswith("}"): | ||
| declared_regex_parts.append("[^/]+") | ||
| else: | ||
| declared_regex_parts.append(_re.escape(segment)) | ||
| declared_regex = _re.compile( | ||
| "^" + "/".join(declared_regex_parts) + "$" | ||
| ) | ||
|
|
||
| # Replace MQTT wildcards with a synthetic literal segment for | ||
| # testing against the declared pattern regex. | ||
| test_pattern = _re.sub(r"[+#]", "x", pattern) | ||
| if declared_regex.match(test_pattern): | ||
| return True | ||
|
|
||
| # Reverse: does the declared pattern (with {param} replaced by a | ||
| # synthetic literal) match the subscription pattern's MQTT regex? | ||
| concrete_declared = _re.sub(r"\{[^}]+\}", "x", declared_pattern) | ||
| try: | ||
| sub_regex = _pattern_to_regex(pattern) | ||
| if sub_regex.match(concrete_declared): | ||
| return True | ||
| except ValueError: | ||
| continue | ||
|
|
There was a problem hiding this comment.
The _match_declared_topic method is inefficient as it performs several redundant operations inside the loop over self._event_topics. Specifically, sub_regex and test_pattern should be computed once outside the loop. Additionally, the declared_regex and concrete_declared strings could be pre-calculated or cached to avoid repeated regex compilation and string manipulation during every subscription request.
try:
sub_regex = _pattern_to_regex(pattern)
except ValueError:
return False
test_pattern = _re.sub(r"[+#]", "x", pattern)
for declared_pattern in self._event_topics:
# Forward: build regex from declared pattern's {param} placeholders
declared_regex_parts = [
"[^/]+" if s.startswith("{") and s.endswith("}") else _re.escape(s)
for s in declared_pattern.split("/")
]
declared_regex = _re.compile(
"^" + "/".join(declared_regex_parts) + "$"
)
if declared_regex.match(test_pattern):
return True
# Reverse: does the declared pattern match the subscription pattern's MQTT regex?
concrete_declared = _re.sub(r"\{[^}]+\}", "x", declared_pattern)
if sub_regex.match(concrete_declared):
return True|
@gemini-code-assist please re-review |
CI Failures NoteThe failing tests in
Event tests ( |
There was a problem hiding this comment.
Code Review
This pull request implements a comprehensive topic-based event system for FastMCP, enabling servers to broadcast real-time notifications to subscribed clients. Key features include a SubscriptionRegistry supporting MQTT-style wildcards, a RetainedValueStore for replaying the latest events to new subscribers, and new API methods in the FastMCP and Context classes for declaring and emitting events. The PR also adds protocol-level handlers, detailed documentation, and extensive test coverage. Review feedback suggests refactoring ISO timestamp parsing to remove unnecessary workarounds and moving local imports to the module level for better code organization.
| try: | ||
| # Python 3.10 fromisoformat() doesn't support "Z" suffix; | ||
| # replace with "+00:00" for compatibility. | ||
| expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) |
| requested_effects: Optional list of advisory effect hints. | ||
| expires_at: Optional ISO 8601 expiry timestamp for retained values. | ||
| """ | ||
| from ulid import ULID |
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements support for MCP events, enabling servers to broadcast real-time notifications to subscribed clients via topic-based streams with MQTT-style wildcard support. Key additions include a subscription registry, a retained value store for late subscribers, and new API methods for declaring and emitting events via the FastMCP and Context classes. The feedback identifies several performance optimization opportunities, specifically regarding the caching of compiled regex patterns for topic matching and the use of concurrent delivery when broadcasting events to multiple active sessions.
| # --------------------------------------------------------------------------- | ||
|
|
||
|
|
||
| def _pattern_to_regex(pattern: str) -> re.Pattern[str]: |
There was a problem hiding this comment.
The _pattern_to_regex function is called frequently during subscription matching and retained value lookups. Since topic patterns are often reused, adding an LRU cache will significantly improve performance by avoiding redundant string splitting and regex compilation.
import functools
@functools.lru_cache(maxsize=128)
def _pattern_to_regex(pattern: str) -> re.Pattern[str]:| declared_regex_parts = [] | ||
| for segment in declared_pattern.split("/"): | ||
| if segment.startswith("{") and segment.endswith("}"): | ||
| declared_regex_parts.append("[^/]+") | ||
| else: | ||
| declared_regex_parts.append(_re.escape(segment)) | ||
| declared_regex = _re.compile( | ||
| "^" + "/".join(declared_regex_parts) + "$" | ||
| ) |
There was a problem hiding this comment.
Regex compilation for declared patterns is happening inside a loop for every subscription request. This is inefficient, especially since FastMCP already provides a _declared_topic_regex_cache intended for this purpose. You should leverage the cache to avoid re-compiling the same patterns repeatedly.
| declared_regex_parts = [] | |
| for segment in declared_pattern.split("/"): | |
| if segment.startswith("{") and segment.endswith("}"): | |
| declared_regex_parts.append("[^/]+") | |
| else: | |
| declared_regex_parts.append(_re.escape(segment)) | |
| declared_regex = _re.compile( | |
| "^" + "/".join(declared_regex_parts) + "$" | |
| ) | |
| if declared_pattern not in self._declared_topic_regex_cache: | |
| declared_regex_parts = [] | |
| for segment in declared_pattern.split("/"): | |
| if segment.startswith("{") and segment.endswith("}"): | |
| declared_regex_parts.append("[^/]+") | |
| else: | |
| declared_regex_parts.append(_re.escape(segment)) | |
| self._declared_topic_regex_cache[declared_pattern] = _re.compile( | |
| "^" + "/".join(declared_regex_parts) + "$" | |
| ) | |
| declared_regex = self._declared_topic_regex_cache[declared_pattern] |
| for sid in matching_session_ids: | ||
| session = self._active_sessions.get(sid) | ||
| if session is None: | ||
| continue | ||
| try: | ||
| await session.send_notification( | ||
| cast(ServerNotification, notification) | ||
| ) | ||
| except Exception: | ||
| logger.warning( | ||
| f"Failed to deliver event to session {sid}", | ||
| exc_info=True, | ||
| ) |
There was a problem hiding this comment.
Broadcasting events to multiple sessions is currently sequential. If one session's transport is slow or experiences latency, it will delay delivery to all subsequent sessions and block the emit_event caller. Using asyncio.gather with error handling for each task would allow concurrent delivery and improve overall system responsiveness.
async def _send_to_session(sid: str):
session = self._active_sessions.get(sid)
if session is None:
return
try:
await session.send_notification(
cast(ServerNotification, notification)
)
except Exception:
logger.warning(
f"Failed to deliver event to session {sid}",
exc_info=True,
)
await asyncio.gather(*[_send_to_session(sid) for sid in matching_session_ids])|
@gemini-code-assist please re-review |
…eted emission
Implement ergonomic authorization for session-scoped event topics:
- {session_id} magic placeholder: subscribers must use their own UUID
- authorize callback escape hatch on declare_event for custom auth
- target_session_ids param on emit_event for defense-in-depth filtering
- Expose session_id via InitializeResult._meta for client discovery
- 19 new tests covering enforcement, callbacks, targeted emit, smuggling
When Context is created for a call_tool invocation, the tool name is now threaded through as _tool_name. Context.emit_event uses this to auto-set source="tool/<name>" when no explicit source is provided, giving event consumers automatic provenance information. - Add _tool_name kwarg to Context.__init__ and tool_name property - Pass _tool_name=name in server.py call_tool Context creation - Auto-set source in Context.emit_event when source is None and tool_name is set - Tests for tool_name property, auto-source, explicit override, and server-level emit
…elivery test_auto_source_set_from_tool_name and test_explicit_source_overrides_auto_source previously only verified the source kwarg passed to emit_event, not the actual value carried in the delivered ServerNotification. Each test now subscribes the active session, captures delivered notifications, and asserts notif.params.source matches the expected value end-to-end.
|
@gemini-code-assist please re-review |
Local ruff (0.15.8) and pre-commit ruff (0.14.10) disagree on line wrapping for a few multi-argument calls. Format with the pinned version so CI static_analysis passes.
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive Events system to FastMCP, enabling servers to publish real-time notifications to subscribed clients via topic-based streams with MQTT-style wildcards. Key features include the @mcp.event decorator and declare_event method for topic registration, emit_event for broadcasting (accessible via FastMCP or Context), and a RetainedValueStore to support late subscribers. The implementation also includes a session-based authorization system and automatic event source tracking for tool-initiated events. Feedback highlights potential performance bottlenecks in topic matching and descriptor lookups, suggesting the use of more efficient data structures like tries or caching for high-frequency event streams.
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method performs a linear scan over all sessions and all their subscription patterns for every event emission. In a server with many active sessions or complex subscription sets, this
| def _find_topic_descriptor(self, topic: str) -> EventTopicDescriptor | None: | ||
| """Find the EventTopicDescriptor for a concrete topic. | ||
|
|
||
| Tries a direct lookup first, then falls back to segment-by-segment | ||
| matching against declared parameterized patterns. | ||
| """ | ||
| # Direct match (fast path) | ||
| descriptor = self._event_topics.get(topic) | ||
| if descriptor is not None: | ||
| return descriptor | ||
| # Fall back to parameterized pattern matching | ||
| for pattern, desc in self._event_topics.items(): | ||
| if self._topic_matches_pattern(topic, pattern): | ||
| return desc | ||
| return None |
There was a problem hiding this comment.
The _find_topic_descriptor method performs a linear scan over all declared topics for every event emission when a direct match is not found. For servers with many parameterized topics or high-frequency event streams, this can impact performance. Consider implementing a cache for concrete topic strings to their corresponding descriptors.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements application-level MCP events, enabling servers to broadcast real-time notifications via topic-based streams. Key additions include a subscription registry with MQTT wildcard support, a retained event store for late subscribers, and new API methods on the FastMCP and Context classes. The implementation also features a flexible authorization system for session-scoped or custom-validated topics. Review feedback suggests returning protocol-compliant rejection lists for over-deep topic patterns instead of raising errors, refining topic matching logic to avoid false positives from synthetic literals, and ensuring exceptions during event broadcasting are properly logged.
| if len(segments) > server._MAX_TOPIC_DEPTH: | ||
| raise McpError( | ||
| mcp.types.ErrorData( | ||
| code=-32602, | ||
| message=( | ||
| f"Subscription pattern has {len(segments)} segments, " | ||
| f"maximum depth is {server._MAX_TOPIC_DEPTH}: {pattern!r}" | ||
| ), | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Raising an McpError immediately when a topic depth exceeds the limit will abort the entire subscription request. According to the MCP specification for events/subscribe, the server should return a result containing both subscribed and rejected lists. It would be more consistent with the protocol to add the over-deep pattern to the rejected list with an appropriate reason (e.g., "invalid_topic") rather than failing the whole batch.
if len(segments) > server._MAX_TOPIC_DEPTH:
rejected.append(
RejectedTopic(
pattern=pattern,
reason="invalid_topic",
)
)
continue|
|
||
| # Replace MQTT wildcards with a synthetic literal segment for | ||
| # testing against the declared pattern regex. | ||
| test_pattern = _re.sub(r"[+#]", "x", pattern) |
There was a problem hiding this comment.
The use of a synthetic literal 'x' to test subscription patterns against declared topic regexes might lead to false positives if 'x' is a meaningful part of a literal segment in a declared topic. A more robust approach would be to use a sentinel character that is guaranteed not to appear in valid topic segments, or to perform segment-by-segment matching directly.
| await asyncio.gather( | ||
| *[_deliver(sid) for sid in matching_session_ids], | ||
| return_exceptions=True, | ||
| ) |
There was a problem hiding this comment.
Using asyncio.gather with return_exceptions=True is good for ensuring that a failure in one session delivery doesn't stop others. However, since the results are ignored, any exceptions that occur during delivery (other than those caught and logged in _deliver) will be silently swallowed. Consider logging the results of gather if they contain exceptions.
results = await asyncio.gather(
*[_deliver(sid) for sid in matching_session_ids],
return_exceptions=True,
)
for res in results:
if isinstance(res, Exception):
logger.error(f"Unexpected error during event broadcast: {res}", exc_info=res)Remove the early-return short-circuit in _find_matching_declared_topics that skipped checking all overlapping declarations when one matched exactly, allowing authorization bypass. Also wrap subscription registry add() in try/except ValueError to gracefully reject malformed MQTT patterns (e.g. non-terminal #) instead of crashing the request.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements a topic-based event publishing and subscription system for FastMCP, including MQTT-style wildcard support and retained event persistence. Key components include a SubscriptionRegistry, a RetainedValueStore, and updated FastMCP and Context classes to handle event declaration and emission. Feedback identifies several performance optimization opportunities, specifically suggesting the use of Tries or regex caches to replace linear scans during topic matching and descriptor lookups. Additionally, it is recommended to use non-blocking background tasks for event delivery to prevent slow clients from delaying the broadcast process.
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method performs a linear scan over all active sessions and all of their subscription patterns for every event emission. In a server with many sessions or many subscriptions, this
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | ||
| """Return all non-expired retained events whose topic matches *pattern*.""" | ||
| async with self._lock: | ||
| if pattern not in self._regex_cache: | ||
| self._regex_cache[pattern] = _pattern_to_regex(pattern) | ||
| regex = self._regex_cache[pattern] | ||
| result: list[RetainedEvent] = [] | ||
| expired_topics: list[str] = [] | ||
| for topic, event in self._store.items(): | ||
| if self._is_expired(topic): | ||
| expired_topics.append(topic) | ||
| continue | ||
| if regex.match(topic): | ||
| # Each topic has exactly one retained event in the store, | ||
| # so no per-topic deduplication is needed here. The caller | ||
| # is responsible for deduplicating across multiple pattern | ||
| # matches (e.g. when processing a subscribe request with | ||
| # overlapping patterns). | ||
| result.append(event) | ||
| for topic in expired_topics: | ||
| del self._store[topic] | ||
| self._expires.pop(topic, None) | ||
| return result |
| def _find_topic_descriptor(self, topic: str) -> EventTopicDescriptor | None: | ||
| """Find the EventTopicDescriptor for a concrete topic. | ||
|
|
||
| Tries a direct lookup first, then falls back to segment-by-segment | ||
| matching against declared parameterized patterns. | ||
| """ | ||
| # Direct match (fast path) | ||
| descriptor = self._event_topics.get(topic) | ||
| if descriptor is not None: | ||
| return descriptor | ||
| # Fall back to parameterized pattern matching | ||
| for pattern, desc in self._event_topics.items(): | ||
| if self._topic_matches_pattern(topic, pattern): | ||
| return desc | ||
| return None |
There was a problem hiding this comment.
The _find_topic_descriptor method uses a manual segment-by-segment string comparison in a loop over all declared topics. This is inefficient and inconsistent with the regex-based matching used in _find_matching_declared_topics (in mcp_operations.py). This method should be refactored to leverage the _declared_topic_regex_cache for better performance and consistency.
| await asyncio.gather( | ||
| *[_deliver(sid) for sid in matching_session_ids], | ||
| return_exceptions=True, | ||
| ) |
There was a problem hiding this comment.
Using asyncio.gather with return_exceptions=True will wait for all delivery tasks to complete before returning. If one or more clients are slow or experiencing network backpressure, emit_event will block for the duration of the slowest delivery. This can be problematic if emit_event is called from a critical path or a background loop that needs to remain responsive. Consider using asyncio.create_task for a "fire-and-forget" broadcast or a task group that doesn't block the immediate caller.
Line wrapping for RejectedTopic constructor call to satisfy ruff-format line length.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive Events feature to FastMCP, enabling servers to publish real-time notifications via topic-based streams with MQTT-style wildcard support. Key components include a SubscriptionRegistry for managing session-to-topic mappings, a RetainedValueStore for replaying the latest events to new subscribers, and integrated authorization mechanisms. The implementation also includes extensive documentation and tests. Review feedback identifies critical naming inconsistencies between camelCase and snake_case for event identifiers (e.g., eventId vs event_id) in the code and tests, which could lead to runtime errors. Additionally, the use of a git-based dependency for the mcp SDK is flagged for replacement with a versioned release. Performance and concurrency improvements are suggested for the subscription matching logic and locking strategy, and topic pattern validation should be enhanced to reject empty segments during declaration.
| "exceptiongroup>=1.2.2", | ||
| "httpx>=0.28.1,<1.0", | ||
| "mcp>=1.24.0,<2.0", | ||
| "mcp @ git+https://github.com/axiomantic/python-sdk.git@mcp-events", |
| if evt.eventId not in seen_event_ids: | ||
| seen_event_ids.add(evt.eventId) |
There was a problem hiding this comment.
There is a naming inconsistency for the event ID field. Here it uses evt.eventId (camelCase), but the tests in tests/server/test_events.py (e.g., line 145) use e.event_id (snake_case). Additionally, RetainedEvent in server.py is constructed using eventId. This inconsistency between camelCase and snake_case for the same field across the codebase and tests will likely lead to AttributeError or validation failures. Please standardize on snake_case for Python attributes as per PEP 8.
if evt.event_id not in seen_event_ids:
seen_event_ids.add(evt.event_id)| params=EventParams( | ||
| topic=topic, | ||
| eventId=event_id, | ||
| payload=payload, | ||
| retained=retained, | ||
| source=source, | ||
| correlationId=correlation_id, | ||
| requestedEffects=requested_effects, | ||
| expiresAt=expires_at, | ||
| ), |
There was a problem hiding this comment.
The EventParams constructor is being called with a mix of snake_case and camelCase arguments (e.g., topic vs eventId, correlationId). Standard Pydantic models in Python should use snake_case for all field names. Using camelCase names as keyword arguments will fail if the model does not explicitly define them as aliases with populate_by_name=True.
notification = EventEmitNotification(
params=EventParams(
topic=topic,
event_id=event_id,
payload=payload,
retained=retained,
source=source,
correlation_id=correlation_id,
requested_effects=requested_effects,
expires_at=expires_at,
),
)| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method iterates over all active sessions and all of their subscription patterns for every event emitted. This results in
| self._lock = asyncio.Lock() | ||
| self._subscriptions: dict[str, set[str]] = {} | ||
| self._compiled: dict[str, re.Pattern[str]] = {} | ||
|
|
||
| def _compile(self, pattern: str) -> re.Pattern[str]: | ||
| if pattern not in self._compiled: | ||
| self._compiled[pattern] = _pattern_to_regex(pattern) | ||
| return self._compiled[pattern] | ||
|
|
||
| async def add(self, session_id: str, pattern: str) -> None: | ||
| """Register a subscription for *session_id* on *pattern*.""" | ||
| async with self._lock: | ||
| self._subscriptions.setdefault(session_id, set()).add(pattern) | ||
| self._compile(pattern) | ||
|
|
||
| async def remove(self, session_id: str, pattern: str) -> None: | ||
| """Remove a single subscription.""" | ||
| async with self._lock: | ||
| if session_id in self._subscriptions: | ||
| self._subscriptions[session_id].discard(pattern) | ||
| if not self._subscriptions[session_id]: | ||
| del self._subscriptions[session_id] | ||
|
|
||
| async def remove_all(self, session_id: str) -> None: | ||
| """Remove all subscriptions for *session_id* (disconnect cleanup).""" | ||
| async with self._lock: | ||
| self._subscriptions.pop(session_id, None) | ||
|
|
||
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result | ||
|
|
||
| async def get_subscriptions(self, session_id: str) -> set[str]: | ||
| """Return the set of patterns a session is subscribed to.""" | ||
| async with self._lock: | ||
| return set(self._subscriptions.get(session_id, set())) | ||
|
|
There was a problem hiding this comment.
The SubscriptionRegistry uses a single asyncio.Lock for all operations, including the potentially long-running match iteration. Since match is called on every emit_event, this lock will cause significant contention. Given that match only reads from _subscriptions, consider using a more granular locking strategy or immutable data structures to allow concurrent matching while subscriptions are being added or removed.
| segments = pattern.split("/") | ||
| if len(segments) > self._MAX_TOPIC_DEPTH: | ||
| raise ValueError( | ||
| f"Topic pattern has {len(segments)} segments, " | ||
| f"maximum depth is {self._MAX_TOPIC_DEPTH}: {pattern!r}" | ||
| ) |
There was a problem hiding this comment.
The declare_event method validates the maximum depth of the topic pattern but does not check for empty segments (e.g., myapp//status). While _topic_matches_pattern correctly ignores such patterns during matching, it is better to reject them early during declaration to provide immediate feedback to the developer.
segments = pattern.split("/")
if any(not s for s in segments):
raise ValueError(f"Topic pattern contains empty segments: {pattern!r}")
if len(segments) > self._MAX_TOPIC_DEPTH:
raise ValueError(
f"Topic pattern has {len(segments)} segments, "
f"maximum depth is {self._MAX_TOPIC_DEPTH}: {pattern!r}"
)The value was bumped to 10 in an earlier mcp-events commit, causing 20+ tests in test_task_mount.py and test_task_protocol.py to exceed their per-test timeout limits. Restoring the original value of 1 (matching main) fixes the timeouts while leaving event tests unaffected.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for MCP events, enabling servers to push real-time notifications to clients. The implementation includes subscription management, a retained value store, and authorization logic for topic patterns. I have reviewed the changes and provided a suggestion to improve the robustness of timestamp parsing in the event expiration logic. The performance of the topic matching lookup is noted as a potential area for future optimization, but no immediate action is required.
| def _is_expired(self, topic: str) -> bool: | ||
| expires_at = self._expires.get(topic) | ||
| if expires_at is None: | ||
| return False | ||
| try: | ||
| # Python 3.10 fromisoformat() doesn't support "Z" suffix; | ||
| # replace with "+00:00" for compatibility. | ||
| expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) | ||
| if expiry.tzinfo is None: | ||
| expiry = expiry.replace(tzinfo=timezone.utc) | ||
| return datetime.now(timezone.utc) >= expiry | ||
| except (ValueError, TypeError): | ||
| return False |
There was a problem hiding this comment.
The error handling for malformed expires_at timestamps in _is_expired could lead to unintended behavior. Currently, if datetime.fromisoformat fails, the except block returns False, effectively treating the event as non-expiring. This could cause an event with a malformed timestamp to be retained indefinitely.
It would be safer to fail-closed by treating malformed timestamps as expired. I suggest returning True from the except block and logging a warning.
| def _is_expired(self, topic: str) -> bool: | |
| expires_at = self._expires.get(topic) | |
| if expires_at is None: | |
| return False | |
| try: | |
| # Python 3.10 fromisoformat() doesn't support "Z" suffix; | |
| # replace with "+00:00" for compatibility. | |
| expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) | |
| if expiry.tzinfo is None: | |
| expiry = expiry.replace(tzinfo=timezone.utc) | |
| return datetime.now(timezone.utc) >= expiry | |
| except (ValueError, TypeError): | |
| return False | |
| def _is_expired(self, topic: str) -> bool: | |
| expires_at = self._expires.get(topic) | |
| if expires_at is None: | |
| return False | |
| try: | |
| # Python 3.10 fromisoformat() doesn't support "Z" suffix; | |
| # replace with "+00:00" for compatibility. | |
| expiry = datetime.fromisoformat(expires_at.replace("Z", "+00:00")) | |
| if expiry.tzinfo is None: | |
| expiry = expiry.replace(tzinfo=timezone.utc) | |
| return datetime.now(timezone.utc) >= expiry | |
| except (ValueError, TypeError): | |
| # Consider logging a warning here about the malformed timestamp. | |
| return True |
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements support for MCP events in FastMCP, enabling servers to broadcast real-time notifications to subscribed clients. It introduces a subscription registry with MQTT-style wildcard support, a retained value store for late subscribers, and new API methods on the FastMCP and Context classes for declaring and emitting events. Comprehensive documentation and tests are also included. Review feedback identifies a critical attribute access error where camelCase was used instead of snake_case, which would lead to runtime errors, and suggests using idiomatic snake_case for constructor parameters to maintain consistency with the MCP SDK.
| if evt.eventId not in seen_event_ids: | ||
| seen_event_ids.add(evt.eventId) |
There was a problem hiding this comment.
The attribute access evt.eventId will raise an AttributeError because Pydantic models in the MCP SDK use snake_case for field names (e.g., event_id). While aliases like eventId can be used in constructors, they are not available as attributes on the instance. The tests in tests/server/test_events.py correctly use event_id, confirming this is the intended attribute name.
| if evt.eventId not in seen_event_ids: | |
| seen_event_ids.add(evt.eventId) | |
| for evt in matching: | |
| if evt.event_id not in seen_event_ids: | |
| seen_event_ids.add(evt.event_id) |
| retained_event = RetainedEvent( | ||
| topic=topic, | ||
| eventId=event_id, | ||
| timestamp=None, | ||
| payload=payload, | ||
| ) |
There was a problem hiding this comment.
While Pydantic allows using aliases in constructors if configured, it is more idiomatic in Python to use snake_case field names. Using event_id instead of eventId aligns with the rest of the codebase and the tests.
| retained_event = RetainedEvent( | |
| topic=topic, | |
| eventId=event_id, | |
| timestamp=None, | |
| payload=payload, | |
| ) | |
| if retained: | |
| retained_event = RetainedEvent( | |
| topic=topic, | |
| event_id=event_id, | |
| timestamp=None, | |
| payload=payload, | |
| ) |
| notification = EventEmitNotification( | ||
| params=EventParams( | ||
| topic=topic, | ||
| eventId=event_id, | ||
| payload=payload, | ||
| retained=retained, | ||
| source=source, | ||
| correlationId=correlation_id, | ||
| requestedEffects=requested_effects, | ||
| expiresAt=expires_at, | ||
| ), | ||
| ) |
There was a problem hiding this comment.
The EventParams constructor is using camelCase aliases (eventId, correlationId, requestedEffects, expiresAt). To maintain consistency with Python idioms and the snake_case attributes used in tests (e.g., notif.params.event_id), these should be updated to use snake_case field names.
| notification = EventEmitNotification( | |
| params=EventParams( | |
| topic=topic, | |
| eventId=event_id, | |
| payload=payload, | |
| retained=retained, | |
| source=source, | |
| correlationId=correlation_id, | |
| requestedEffects=requested_effects, | |
| expiresAt=expires_at, | |
| ), | |
| ) | |
| notification = EventEmitNotification( | |
| params=EventParams( | |
| topic=topic, | |
| event_id=event_id, | |
| payload=payload, | |
| retained=retained, | |
| source=source, | |
| correlation_id=correlation_id, | |
| requested_effects=requested_effects, | |
| expires_at=expires_at, | |
| ), | |
| ) |
…middleware When middleware raised McpError during initialization, the capturing_respond wrapper unconditionally accessed response.root, which ErrorData does not have. This caused an AttributeError caught by the mcp SDK's generic error handler, which replaced the original error message with "Invalid request parameters". Guard the session_id injection block with an isinstance(ErrorData) check so error responses pass through unmodified.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive event publishing system to FastMCP, allowing servers to broadcast real-time notifications to subscribed clients using topic-based streams with MQTT-style wildcards. Key additions include the @mcp.event decorator and ctx.emit_event for tools, a SubscriptionRegistry for managing client interests, and a RetainedValueStore for late-subscriber state recovery. The implementation also features session-scoped authorization and automatic event sourcing from tool contexts. Feedback focuses on improving error clarity for missing session contexts, optimizing broadcast performance for high session counts via bounded concurrency, and ensuring RetainedEvent models include valid timestamps to prevent potential client-side validation issues.
| if session_id is None: | ||
| raise McpError( | ||
| mcp.types.ErrorData( | ||
| code=-32603, | ||
| message="No session context available for subscription", | ||
| ) | ||
| ) |
There was a problem hiding this comment.
Raising a generic Internal Error (-32603) when session_id is missing might be confusing for clients. Since this typically happens if the session wasn't properly initialized or registered in the registry, a more descriptive error or ensuring the session lifecycle is strictly managed before allowing subscriptions would be better. However, given the current architecture, this is a reasonable safety check.
| # Broadcast to matching active sessions in parallel so slow sessions | ||
| # don't block delivery to others. | ||
| async def _deliver(sid: str) -> None: | ||
| session = self._active_sessions.get(sid) | ||
| if session is None: | ||
| return | ||
| try: | ||
| await session.send_notification(cast(ServerNotification, notification)) | ||
| except Exception: | ||
| logger.warning( | ||
| f"Failed to deliver event to session {sid}", | ||
| exc_info=True, | ||
| ) | ||
|
|
||
| await asyncio.gather( | ||
| *[_deliver(sid) for sid in matching_session_ids], | ||
| return_exceptions=True, | ||
| ) |
There was a problem hiding this comment.
Broadcasting events using asyncio.gather with return_exceptions=True is efficient for a small number of sessions. However, if the server scales to hundreds or thousands of active sessions, creating a task for every single delivery might lead to performance spikes. Consider using a task group or a bounded concurrency mechanism if high session counts are expected.
| retained_event = RetainedEvent( | ||
| topic=topic, | ||
| eventId=event_id, | ||
| timestamp=None, | ||
| payload=payload, | ||
| ) |
There was a problem hiding this comment.
The RetainedEvent model is initialized with timestamp=None. Depending on the MCP SDK's strictness, a missing timestamp might cause validation issues on the client side if it expects a valid ISO 8601 string for all events. It is recommended to provide the current UTC timestamp if the SDK allows it.
| retained_event = RetainedEvent( | |
| topic=topic, | |
| eventId=event_id, | |
| timestamp=None, | |
| payload=payload, | |
| ) | |
| if retained: | |
| from datetime import datetime, timezone | |
| retained_event = RetainedEvent( | |
| topic=topic, | |
| eventId=event_id, | |
| timestamp=datetime.now(timezone.utc).isoformat(), | |
| payload=payload, | |
| ) |
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements a comprehensive event publishing system for FastMCP, enabling servers to broadcast real-time notifications via topic-based streams with MQTT-style wildcards. Key features include event declaration decorators, support for retained events, and a session-aware authorization system. Review feedback identifies several improvement opportunities, specifically regarding potential memory leaks in internal regex caches and the O(N*M) complexity of the subscription matching logic, which could become a bottleneck at scale. Additionally, it is recommended to replace the direct git dependency for the core SDK with a stable release version to ensure build stability.
| "exceptiongroup>=1.2.2", | ||
| "httpx>=0.28.1,<1.0", | ||
| "mcp>=1.24.0,<2.0", | ||
| "mcp @ git+https://github.com/axiomantic/python-sdk.git@mcp-events", |
There was a problem hiding this comment.
Using a direct git URL for a core dependency like mcp is generally discouraged for a library or framework intended for public release. This can lead to non-deterministic builds and issues with dependency resolution for downstream users. If the required event features are not yet available in a stable release of the mcp SDK, consider waiting for a release or using a pre-release version if available on PyPI.
| def __init__(self) -> None: | ||
| self._lock = asyncio.Lock() | ||
| self._subscriptions: dict[str, set[str]] = {} | ||
| self._compiled: dict[str, re.Pattern[str]] = {} |
There was a problem hiding this comment.
The _compiled dictionary in SubscriptionRegistry grows without bound as clients subscribe to unique patterns. Since _pattern_to_regex is already decorated with lru_cache, this internal dictionary is redundant and introduces a memory leak. It is safer to call the cached function directly.
| self._compiled: dict[str, re.Pattern[str]] = {} | |
| # Remove self._compiled from __init__ and _compile method | |
| async def match(self, topic: str) -> set[str]: | |
| """Return session IDs whose subscriptions match *topic*. | |
| Each session appears at most once (at-most-once delivery guarantee). | |
| """ | |
| async with self._lock: | |
| result: set[str] = set() | |
| for session_id, patterns in self._subscriptions.items(): | |
| for pattern in patterns: | |
| regex = _pattern_to_regex(pattern) | |
| if regex.match(topic): | |
| result.add(session_id) | |
| break # at-most-once per session | |
| return result |
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method iterates over all active sessions and all of their subscription patterns for every event emission. This results in
| self._lock = asyncio.Lock() | ||
| self._store: dict[str, RetainedEvent] = {} | ||
| self._expires: dict[str, str] = {} | ||
| self._regex_cache: dict[str, re.Pattern[str]] = {} |
There was a problem hiding this comment.
The _regex_cache in RetainedValueStore grows without bound and is redundant because _pattern_to_regex is already cached. Removing it prevents a potential memory leak.
| self._regex_cache: dict[str, re.Pattern[str]] = {} | |
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | |
| """Return all non-expired retained events whose topic matches *pattern*.""" | |
| async with self._lock: | |
| regex = _pattern_to_regex(pattern) | |
| result: list[RetainedEvent] = [] | |
| expired_topics: list[str] = [] | |
| for topic, event in self._store.items(): | |
| if self._is_expired(topic): | |
| expired_topics.append(topic) | |
| continue | |
| if regex.match(topic): | |
| result.append(event) | |
| for topic in expired_topics: | |
| del self._store[topic] | |
| self._expires.pop(topic, None) | |
| return result |
| # Cached regex compiled from each declared topic pattern's {param} | ||
| # placeholders, used to accelerate subscription pattern matching. | ||
| # Keyed by declared pattern string; populated lazily on first use. | ||
| self._declared_topic_regex_cache: dict[str, re.Pattern[str]] = {} |
There was a problem hiding this comment.
The _declared_topic_regex_cache dictionary grows as new event topics are declared. While declared topics are typically fixed at startup, dynamic declarations could lead to unbounded growth. Consider using a function decorated with functools.lru_cache to handle the regex generation and caching for declared patterns.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive event publishing system to FastMCP, enabling servers to broadcast real-time notifications to subscribed clients via topic-based streams with MQTT-style wildcards. Key additions include the @mcp.event decorator and ctx.emit_event method, a SubscriptionRegistry for managing client subscriptions, and a RetainedValueStore for replaying the latest events to new subscribers. The implementation also features a built-in authorization mechanism using a {session_id} convention and custom callbacks. Feedback highlights the need to replace the git-based mcp dependency with a stable release, potential performance bottlenecks in the linear subscription matching logic, and the requirement to document or reconsider the restriction on topics with leading or trailing slashes.
| "exceptiongroup>=1.2.2", | ||
| "httpx>=0.28.1,<1.0", | ||
| "mcp>=1.24.0,<2.0", | ||
| "mcp @ git+https://github.com/axiomantic/python-sdk.git@mcp-events", |
There was a problem hiding this comment.
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method performs a linear scan over all sessions and all their subscription patterns. While acceptable for a small number of clients, this O(N*M) complexity could become a performance bottleneck if the server handles many sessions or high-frequency event streams. Consider using a more efficient data structure like a trie for topic matching if scalability becomes an issue.
| return False | ||
| concrete_parts = concrete_topic.split("/") | ||
| pattern_parts = declared_pattern.split("/") | ||
| if any(not s for s in concrete_parts) or any(not s for s in pattern_parts): |
There was a problem hiding this comment.
The check any(not s for s in concrete_parts) effectively bans topics with leading, trailing, or double slashes (e.g., /myapp, myapp/, myapp//status). While this simplifies the API, it deviates from MQTT behavior where these are valid and distinct topics. If this restriction is intentional for the high-level API, it should be explicitly documented.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces support for MCP events in FastMCP, enabling servers to publish real-time notifications to subscribed clients via topic-based streams with MQTT-style wildcards. Key additions include a SubscriptionRegistry for managing session-to-topic mappings, a RetainedValueStore for replaying the latest events to new subscribers, and new API methods (declare_event, emit_event) on both the FastMCP server and the Context object. Feedback focuses on potential performance bottlenecks in topic matching and descriptor resolution, suggesting more efficient data structures or caching for high-concurrency scenarios.
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method iterates over all active sessions and all of their subscription patterns while holding an asyncio.Lock. For a server with a large number of concurrent sessions or complex subscription sets, this add or remove). Consider using a more efficient data structure for topic matching, such as a trie or a specialized MQTT-style matcher, if high session counts are expected.
| def _find_topic_descriptor(self, topic: str) -> EventTopicDescriptor | None: | ||
| """Find the EventTopicDescriptor for a concrete topic. | ||
|
|
||
| Tries a direct lookup first, then falls back to segment-by-segment | ||
| matching against declared parameterized patterns. | ||
| """ | ||
| # Direct match (fast path) | ||
| descriptor = self._event_topics.get(topic) | ||
| if descriptor is not None: | ||
| return descriptor | ||
| # Fall back to parameterized pattern matching | ||
| for pattern, desc in self._event_topics.items(): | ||
| if self._topic_matches_pattern(topic, pattern): | ||
| return desc | ||
| return None |
There was a problem hiding this comment.
The _find_topic_descriptor method performs a linear search over self._event_topics when a direct match is not found. While typically safe for a small number of declared topics, this emit_event to determine retention policy. If a server declares a very large number of parameterized topics, consider caching the resolution of concrete topics to descriptors.
Update emit_event, declare_event, and topic authorization to match the
v2 spec in ~/.local/spellbook/docs/proposals/mcp-events-api-spec.md.
- emit_event: replace requested_effects and correlation_id with a single
priority kwarg (urgent/high/normal/low). Payload is now optional for
pure signal events.
- declare_event and @mcp.event: add required kind ("content"|"signal")
and optional suggested_handle (drop|silent|notify|ask|inject|interrupt).
- Rename {session_id} magic topic placeholder to {agent_id} in
authorization and documentation. The fastmcp default enforcement still
uses the MCP transport session UUID as the agent identity; the rename
is about matching the spec's identity vocabulary. Variables named
session_id referring to the MCP transport session are unchanged.
- Bump mcp dependency in uv.lock to pick up the v2 SDK types.
…zation
Per MCP Events v2 identity model, {agent_id} in a declared topic is a
client-side declarative parameter, not a binding to the MCP transport
session UUID. Multiple agents can share a single transport, so fastmcp
cannot reliably know which agent is making a subscribe call. The v1-era
policy that forced "subscribe slot == subscriber session_id" was wrong.
Default policy is now permissive: any subscriber whose pattern matches
a declared topic is allowed. Per-agent or per-tenant isolation is opt-in
via an explicit authorize callback registered on declare_event. The
callback receives (session_id, topic_params) and is fully responsible
for the decision; it can inspect topic_params["agent_id"] and reject
wildcards or unbound agent identities as needed.
_check_agent_id_enforcement is deleted. _authorize_subscription is
rewritten to "callback or allow", preserving fail-closed behavior when
the callback raises. declare_event / event docstrings are updated to
describe the new model. Tests covering the old {agent_id} semantics are
replaced with tests for the permissive default and explicit callback
patterns, including wildcard rejection via topic_params inspection.
Summary
declare_event()and@mcp.event()decorator for topic declarationemit_event()on FastMCP instances and Context objects for broadcast deliveryContext
High-level server API layer for MCP events SEP reference implementation.
Builds on python-sdk event types. Self-contains event types due to SDK version constraints.
Test plan
uv run pytest tests/server/test_events.py -x -v