-
Notifications
You must be signed in to change notification settings - Fork 0
MCP Events: High-level server API with @mcp.event() and emit_event() #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 9 commits
b0b5f83
efe2414
9d03bfb
83e52a3
bc904e4
8be1d56
b0ba791
06ee559
ab61a70
3a2a432
ab35863
56307f7
c1dfdfb
059e61d
0067f4c
9918500
8ff158c
4ca62c8
8ec9f14
8b1a270
0e9e120
b407e0b
def988a
fedd842
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,247 @@ | ||
| --- | ||
| title: Events | ||
| sidebarTitle: Events | ||
| description: Publish real-time notifications to subscribed clients through topic-based event streams. | ||
| icon: tower-broadcast | ||
| tag: "NEW" | ||
| --- | ||
|
|
||
| import { VersionBadge } from '/snippets/version-badge.mdx' | ||
|
|
||
| <VersionBadge version="3.3.0" /> | ||
|
|
||
| MCP events let servers push notifications to clients without waiting for a request. Clients subscribe to topics they care about, and the server broadcasts events to all matching subscribers. This is useful for status updates, progress streams, chat messages, sensor readings, or any data that changes over time. | ||
|
|
||
| Events are topic-based. A topic is a hierarchical string like `myapp/status` or `chat/rooms/general/messages`. Servers declare the topics they publish, clients subscribe to patterns, and the server delivers matching events to each session. | ||
|
|
||
| ## Declaring Event Topics | ||
|
|
||
| Before emitting events, declare the topics your server publishes. Declared topics are advertised to clients during connection setup, so clients know what they can subscribe to. | ||
|
|
||
| ### The `@event` Decorator | ||
|
|
||
| The `@event` decorator declares a topic and uses the decorated function's return type to generate a JSON Schema for the event payload. The function itself is not called automatically; it serves as a schema definition. | ||
|
|
||
| ```python | ||
| from fastmcp import FastMCP | ||
|
|
||
| mcp = FastMCP("EventServer") | ||
|
|
||
| @mcp.event("myapp/status") | ||
| def status_event() -> dict: | ||
| """Server status updates.""" | ||
| ... | ||
|
|
||
| @mcp.event("myapp/metrics") | ||
| def metrics_event() -> dict: | ||
| """Periodic metric snapshots.""" | ||
| ... | ||
| ``` | ||
|
|
||
| The decorator reads the docstring as the topic description and extracts a JSON Schema from the return type annotation. If you provide an explicit `description`, it takes precedence over the docstring. | ||
|
|
||
| ```python | ||
| @mcp.event("myapp/alerts", description="Critical system alerts") | ||
| def alert_event() -> dict: | ||
| ... | ||
| ``` | ||
|
|
||
| ### `declare_event()` | ||
|
|
||
| For cases where a decorator does not fit (dynamic topic registration, topics without a schema function), use `declare_event()` directly: | ||
|
|
||
| ```python | ||
| mcp = FastMCP("EventServer") | ||
|
|
||
| mcp.declare_event( | ||
| "myapp/status", | ||
| description="Server status updates", | ||
| retained=True, | ||
| schema={"type": "object", "properties": {"state": {"type": "string"}}}, | ||
| ) | ||
| ``` | ||
|
|
||
| <Card icon="code" title="declare_event() Parameters"> | ||
| <ParamField body="pattern" type="str"> | ||
| Topic pattern string. Supports `{param}` placeholders for parameterized topics (see [Topic Patterns](#topic-patterns)). Maximum depth: 8 segments. | ||
| </ParamField> | ||
|
|
||
| <ParamField body="description" type="str | None"> | ||
| Human-readable description of the topic. | ||
| </ParamField> | ||
|
|
||
| <ParamField body="retained" type="bool" default="False"> | ||
| When `True`, the server stores the most recent event for this topic and delivers it to new subscribers immediately on subscribe. See [Retained Events](#retained-events). | ||
| </ParamField> | ||
|
|
||
| <ParamField body="schema" type="dict | None"> | ||
| JSON Schema describing the event payload structure. | ||
| </ParamField> | ||
| </Card> | ||
|
|
||
| ## Emitting Events | ||
|
|
||
| Once topics are declared, emit events using `mcp.emit_event()` or `ctx.emit_event()`. Both broadcast to all sessions whose subscriptions match the topic. | ||
|
|
||
| ### From a Tool (via Context) | ||
|
|
||
| Inside a tool, resource, or prompt function, use `ctx.emit_event()`: | ||
|
|
||
| ```python | ||
| from fastmcp import FastMCP, Context | ||
|
|
||
| mcp = FastMCP("EventServer") | ||
|
|
||
| mcp.declare_event("myapp/notifications", description="User notifications") | ||
|
|
||
| @mcp.tool | ||
| async def send_notification(message: str, ctx: Context) -> str: | ||
| """Send a notification to all subscribers.""" | ||
| await ctx.emit_event("myapp/notifications", {"text": message}) | ||
| return "sent" | ||
| ``` | ||
|
|
||
| `ctx.emit_event()` delegates to the server's `emit_event()`, so the behavior is identical. Use whichever you have access to. | ||
|
|
||
| ### From Background Code | ||
|
|
||
| Outside of a tool or request handler, call `emit_event()` on the FastMCP instance directly. This works from lifespan hooks, background tasks, or any code that holds a reference to the server: | ||
|
|
||
| ```python | ||
| import asyncio | ||
| from fastmcp import FastMCP | ||
|
|
||
| mcp = FastMCP("SensorServer") | ||
|
|
||
| mcp.declare_event("sensors/temperature", description="Temperature readings", retained=True) | ||
|
|
||
| @mcp.lifespan | ||
| async def publish_temperature(server: FastMCP): | ||
| """Emit temperature readings every 5 seconds.""" | ||
| async def _loop(): | ||
| while True: | ||
| reading = await read_sensor() | ||
| await server.emit_event("sensors/temperature", {"celsius": reading}) | ||
| await asyncio.sleep(5) | ||
|
|
||
| task = asyncio.create_task(_loop()) | ||
| try: | ||
| yield {} | ||
| finally: | ||
| task.cancel() | ||
| ``` | ||
|
|
||
| ### `emit_event()` Parameters | ||
|
|
||
| <Card icon="code" title="emit_event() Parameters"> | ||
| <ParamField body="topic" type="str" required> | ||
| Concrete topic string (no wildcards). Must match a declared topic pattern. | ||
| </ParamField> | ||
|
|
||
| <ParamField body="payload" type="Any" required> | ||
| Event payload. Any JSON-serializable value. | ||
| </ParamField> | ||
|
|
||
| <ParamField body="event_id" type="str | None"> | ||
| Unique event identifier. If omitted, a ULID is generated automatically. | ||
| </ParamField> | ||
|
|
||
| <ParamField body="retained" type="bool | None"> | ||
| Override the topic's retained setting for this specific event. If `None`, uses the topic descriptor's `retained` value. | ||
| </ParamField> | ||
|
|
||
| <ParamField body="source" type="str | None"> | ||
| Source identifier for tracing where the event originated. | ||
| </ParamField> | ||
|
|
||
| <ParamField body="correlation_id" type="str | None"> | ||
| Correlation ID for request tracing across systems. | ||
| </ParamField> | ||
|
|
||
| <ParamField body="requested_effects" type="list[EventEffect] | None"> | ||
| Advisory hints for clients about how to handle the event (e.g., inject into context, show notification). | ||
| </ParamField> | ||
|
|
||
| <ParamField body="expires_at" type="str | None"> | ||
| ISO 8601 timestamp after which the retained value should be considered stale. Only meaningful when the event is retained. | ||
| </ParamField> | ||
| </Card> | ||
|
|
||
| ## Retained Events | ||
|
|
||
| Retained events solve the "late subscriber" problem. When a topic is marked as retained, the server stores the most recent event for that topic. When a new client subscribes, it receives the stored value immediately without waiting for the next publish. | ||
|
|
||
| This is useful for state that has a "current value" semantic: connection status, latest configuration, sensor readings. | ||
|
|
||
| ```python | ||
| mcp = FastMCP("StatusServer") | ||
|
|
||
| # The retained flag causes the last emitted event to be stored | ||
| mcp.declare_event("service/status", description="Service health status", retained=True) | ||
|
|
||
| # Or with the decorator: | ||
| @mcp.event("service/config", retained=True) | ||
| def config_event() -> dict: | ||
| """Current service configuration.""" | ||
| ... | ||
| ``` | ||
|
|
||
| When retained is enabled: | ||
|
|
||
| 1. Each `emit_event()` call replaces the stored value for that topic. | ||
| 2. New subscribers receive the stored value as part of the subscribe response. | ||
| 3. If `expires_at` is set and the timestamp has passed, the stored value is discarded instead of delivered. | ||
|
|
||
| You can also override the retained behavior per-emit: | ||
|
|
||
| ```python | ||
| # Force retention even if the topic descriptor says retained=False | ||
| await mcp.emit_event("myapp/status", {"state": "running"}, retained=True) | ||
| ``` | ||
|
|
||
| ## Topic Patterns | ||
|
|
||
| Topic strings use `/` as a segment separator. Declared topics can include `{param}` placeholders to describe families of related topics: | ||
|
|
||
| ```python | ||
| mcp = FastMCP("ChatServer") | ||
|
|
||
| # A parameterized topic pattern | ||
| mcp.declare_event( | ||
| "chat/rooms/{room_id}/messages", | ||
| description="Messages in a chat room", | ||
| ) | ||
|
|
||
| # Emit to a concrete topic that matches the pattern | ||
| await mcp.emit_event("chat/rooms/general/messages", {"text": "hello"}) | ||
| await mcp.emit_event("chat/rooms/random/messages", {"text": "world"}) | ||
| ``` | ||
|
|
||
| The `{room_id}` placeholder matches any single segment. When the server receives a subscription or emits an event, it matches concrete topics against declared patterns segment-by-segment. | ||
|
|
||
| ### Subscription Wildcards | ||
|
|
||
| Clients subscribe using MQTT-style wildcards: | ||
|
|
||
| | Wildcard | Matches | Example | | ||
| |----------|---------|---------| | ||
| | `+` | Exactly one segment | `chat/rooms/+/messages` matches `chat/rooms/general/messages` | | ||
| | `#` | Zero or more trailing segments (must be last) | `chat/#` matches `chat/rooms/general/messages` and `chat` | | ||
|
|
||
| A session that subscribes to `chat/rooms/+/messages` receives events from all rooms. A session that subscribes to `chat/#` receives all chat-related events. | ||
|
|
||
| Each session receives at most one copy of an event, even if multiple subscription patterns overlap. | ||
|
|
||
| ## Session Registry and Broadcast | ||
|
|
||
| FastMCP maintains a registry of active sessions and their subscriptions. When `emit_event()` is called: | ||
|
|
||
| 1. The subscription registry finds all sessions with patterns matching the topic. | ||
| 2. The event notification is sent to each matching session. | ||
| 3. Delivery failures to individual sessions are logged but do not block delivery to other sessions. | ||
|
|
||
| Sessions are automatically registered when they connect and removed when they disconnect. You do not need to manage the session lifecycle. | ||
|
|
||
| ## Capabilities | ||
|
|
||
| When at least one event topic is declared, the server advertises the `events` capability during initialization. Clients that do not support events can still connect and use other server features; event-related methods return an error only if the client tries to subscribe to a server with no declared topics. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,7 +7,7 @@ dependencies = [ | |
| "python-dotenv>=1.1.0", | ||
| "exceptiongroup>=1.2.2", | ||
| "httpx>=0.28.1,<1.0", | ||
| "mcp>=1.24.0,<2.0", | ||
| "mcp @ git+https://github.com/axiomantic/python-sdk.git@mcp-events", | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using a direct git URL for a core dependency like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| "openapi-pydantic>=0.5.1", | ||
| "opentelemetry-api>=1.20.0", | ||
| "packaging>=24.0", | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
mcpdependency is pinned to a specific git branch. While this is acceptable for development, it's not suitable for production. Before this change is merged into a main or release branch, this should be updated to point to a released version of themcppackage on PyPI to ensure stable and reproducible builds.