-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathserver.py
More file actions
329 lines (273 loc) · 12.3 KB
/
server.py
File metadata and controls
329 lines (273 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
"""
Couchbase MCP Server (Extended, hardened)
=========================================
Exposes the full Couchbase data-plane AND admin REST API as MCP tools, with
defense-in-depth safety primitives modeled after the official Couchbase MCP.
Tool categories (all upstream names preserved)
──────────────────────────────────────────────
Data - CRUD, N1QL, FTS search, ping (cb_*)
Buckets - create/update/delete/flush (admin_bucket_*)
Collections- scopes and collections (admin_scope_*, admin_collection_*)
Security - users, groups, RBAC, audit (admin_user_*, admin_group_*, admin_*)
Cluster - nodes, rebalance, failover (admin_cluster_*, admin_node_*, admin_*)
XDCR - references and replications (admin_xdcr_*)
Indexes - GSI create/drop/build, settings (admin_index_*)
FTS Admin - FTS index CRUD + stats (admin_fts_*)
Stats - metrics, events, internal (admin_stats_*, admin_*)
Diagnostics- schema, advisor, EXPLAIN, perf (cb_get_schema_for_collection,
cb_index_advisor, cb_explain_query,
cb_perf_*)
8.x-only - vector indexes, lock, conflicts (admin_vector_index_create_*,
admin_user_lock/unlock/create_temporary,
admin_xdcr_conflict_log_query,
cb_perf_by_user)
Extended - transactions, Analytics, Backup (cb_transaction_run,
cb_analytics_query, admin_backup_*)
Eventing - function lifecycle, deploy, stats (admin_eventing_*)
Synonyms - FTS synonym set documents (8.x) (cb_fts_synonym_*)
Encryption - DARE + KMIP (admin_encryption_*, admin_kmip_*)
Capella v4 - SaaS control plane (read-only) (capella_*)
Environment variables
─────────────────────
CONNECTION
CB_CONNECTION_STRING couchbase://localhost (use couchbases:// for TLS)
CB_USERNAME (required unless using mTLS)
CB_PASSWORD (required unless using mTLS)
CB_BUCKET default
CB_SCOPE _default
CB_COLLECTION _default
CB_MGMT_PORT 8091 (or 18091 for TLS; Capella self-managed admin)
mTLS / TLS (Phase 3)
CB_CLIENT_CERT_PATH path to client cert PEM (presence enables mTLS)
CB_CLIENT_KEY_PATH path to client key PEM
CB_CA_CERT_PATH path to CA cert for self-signed self-managed clusters
CB_MCP_TLS_INSECURE false set true to skip TLS verification (dev only)
SAFETY (Phase 1)
CB_MCP_READ_ONLY_MODE true when true, write tools are NOT loaded
CB_MCP_DISABLED_TOOLS comma list, or path to file with one name per line
CB_MCP_CONFIRMATION_REQUIRED_TOOLS additional tools that require confirm:true
CB_MCP_ELICITATION_HINTS true include hint text in confirmation errors
NETWORK (Phase 2)
CB_MCP_HTTP_RETRIES 3 max attempts for admin HTTP calls
CB_MCP_HTTP_TIMEOUT 30 per-request timeout in seconds
TRANSPORT (Phase 3)
CB_MCP_TRANSPORT stdio one of: stdio, http
CB_MCP_HOST 127.0.0.1 for http transport
CB_MCP_PORT 8000 for http transport
Compatibility
─────────────
All tool names from the upstream celticht32 server are preserved. New tools
may be added in future; new optional `confirm` arguments are introduced on
destructive tools but do not change existing tool semantics.
"""
from __future__ import annotations
import asyncio
import os
import sys
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import TextContent, Tool
from handlers import (
buckets,
capella,
cluster,
collections,
data,
diagnostics,
eight_x,
encryption,
eventing,
extended,
indexes,
mcp_status,
search_admin,
security,
stats,
synonyms,
xdcr,
)
from handlers.shared import (
DISABLED_TOOLS,
READ_ONLY_MODE,
err,
get_confirmation_required,
require_confirmation,
)
# ── Aggregate tool registry ──────────────────────────────────────────────────
_RAW_TOOLS: list[Tool] = (
data.TOOLS
+ buckets.TOOLS
+ collections.TOOLS
+ security.TOOLS
+ cluster.TOOLS
+ xdcr.TOOLS
+ indexes.TOOLS
+ search_admin.TOOLS
+ stats.TOOLS
+ diagnostics.TOOLS
+ eight_x.TOOLS
+ extended.TOOLS
+ eventing.TOOLS
+ synonyms.TOOLS
+ encryption.TOOLS
+ capella.TOOLS
+ mcp_status.TOOLS
)
_HANDLERS = {
**{t.name: data for t in data.TOOLS},
**{t.name: buckets for t in buckets.TOOLS},
**{t.name: collections for t in collections.TOOLS},
**{t.name: security for t in security.TOOLS},
**{t.name: cluster for t in cluster.TOOLS},
**{t.name: xdcr for t in xdcr.TOOLS},
**{t.name: indexes for t in indexes.TOOLS},
**{t.name: search_admin for t in search_admin.TOOLS},
**{t.name: stats for t in stats.TOOLS},
**{t.name: diagnostics for t in diagnostics.TOOLS},
**{t.name: eight_x for t in eight_x.TOOLS},
**{t.name: extended for t in extended.TOOLS},
**{t.name: eventing for t in eventing.TOOLS},
**{t.name: synonyms for t in synonyms.TOOLS},
**{t.name: encryption for t in encryption.TOOLS},
**{t.name: capella for t in capella.TOOLS},
**{t.name: mcp_status for t in mcp_status.TOOLS},
}
# Tools that stay loaded in read-only mode despite destructiveHint=true,
# because they enforce read-only behavior internally (e.g. cb_query rejects DML).
_ALWAYS_LOADED_IN_READ_ONLY: set[str] = {"cb_query", "cb_analytics_query"}
def _is_read_only(t: Tool) -> bool:
"""A tool is read-only if its annotation says so."""
return bool(t.annotations and t.annotations.readOnlyHint)
def _filter_tools(raw_tools: list[Tool]) -> list[Tool]:
"""Apply read-only mode and disabled-tools filters."""
filtered: list[Tool] = []
for t in raw_tools:
if t.name in DISABLED_TOOLS:
continue
if READ_ONLY_MODE:
if not _is_read_only(t) and t.name not in _ALWAYS_LOADED_IN_READ_ONLY:
continue
filtered.append(t)
return filtered
_TOOLS: list[Tool] = _filter_tools(_RAW_TOOLS)
# Default confirmation set: every destructive tool that survived the filter.
_DEFAULT_CONFIRMATION = {
t.name for t in _TOOLS if t.annotations and t.annotations.destructiveHint
}
_CONFIRMATION_REQUIRED: set[str] = get_confirmation_required(_DEFAULT_CONFIRMATION)
# cb_query and cb_analytics_query are "destructive in spec" but have internal
# DML blocking; don't double-gate.
_CONFIRMATION_REQUIRED.discard("cb_query")
_CONFIRMATION_REQUIRED.discard("cb_analytics_query")
# ── MCP server ───────────────────────────────────────────────────────────────
app = Server("couchbase-mcp")
@app.list_tools()
async def list_tools() -> list[Tool]:
return _TOOLS
@app.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
arguments = dict(arguments or {})
handler = _HANDLERS.get(name)
if handler is None:
return err(
f"Unknown tool: {name}", tool=name, hint="Tool may be disabled or unloaded."
)
# Tool must also be in the currently exposed list.
if name not in {t.name for t in _TOOLS}:
return err(
f"Tool {name} is not enabled in this server configuration.",
tool=name,
hint=(
"It may be unloaded because CB_MCP_READ_ONLY_MODE=true or it "
"appears in CB_MCP_DISABLED_TOOLS."
),
)
# Confirmation gate for destructive tools.
in_confirm_set = name in _CONFIRMATION_REQUIRED
msg = require_confirmation(name, arguments, in_confirm_set)
if msg:
return err(msg, tool=name, args=arguments, requires_confirmation=True)
# Strip the confirm key so it never reaches REST/SDK calls as a stray field.
arguments.pop("confirm", None)
loop = asyncio.get_event_loop()
try:
return await loop.run_in_executor(None, handler.handle, name, arguments)
except Exception as exc:
return err(f"{type(exc).__name__}: {exc}", tool=name, args=arguments)
# ── Startup banner ───────────────────────────────────────────────────────────
def _startup_banner() -> None:
msg = (
f"[couchbase-mcp] tools loaded: {len(_TOOLS)} of {len(_RAW_TOOLS)} "
f"(read_only={READ_ONLY_MODE}, disabled={len(DISABLED_TOOLS)}, "
f"confirmation_required={len(_CONFIRMATION_REQUIRED)})"
)
# Banner goes to stderr so it does not pollute stdio MCP framing.
print(msg, file=sys.stderr, flush=True)
# ── Transport selection ──────────────────────────────────────────────────────
async def _main_stdio() -> None:
async with stdio_server() as (read_stream, write_stream):
await app.run(read_stream, write_stream, app.create_initialization_options())
async def _main_http() -> None:
"""Streamable HTTP transport. Note: this mode does not include authorization;
deploy behind a reverse proxy or authenticated network."""
try:
from mcp.server.streamable_http import StreamableHTTPServerTransport
except ImportError:
print(
"[couchbase-mcp] Streamable HTTP transport requires a newer mcp library. "
"Falling back to stdio.",
file=sys.stderr,
)
await _main_stdio()
return
host = os.environ.get("CB_MCP_HOST", "127.0.0.1")
port = int(os.environ.get("CB_MCP_PORT", "8000"))
print(
f"[couchbase-mcp] HTTP transport listening on http://{host}:{port}/mcp",
file=sys.stderr,
flush=True,
)
# The exact instantiation API for StreamableHTTPServerTransport varies
# across mcp library versions. We delegate to a thin runner so the user
# can adapt this in their environment if the API has shifted.
try:
import uvicorn # type: ignore
from starlette.applications import Starlette
from starlette.routing import Mount
transport = StreamableHTTPServerTransport(mcp_session_id=None)
starlette_app = Starlette(routes=[Mount("/mcp", app=transport.handle_request)])
config = uvicorn.Config(starlette_app, host=host, port=port, log_level="info")
server = uvicorn.Server(config)
async def run_server():
async with transport.connect() as (rs, ws):
# asyncio.TaskGroup is 3.11+. Use gather for 3.10 compatibility.
# If one task fails, the other is cancelled (return_exceptions=False)
# and the first exception propagates — same effective behavior as
# TaskGroup for our two-task case.
await asyncio.gather(
server.serve(),
app.run(rs, ws, app.create_initialization_options()),
)
await run_server()
except ImportError as exc:
print(
f"[couchbase-mcp] HTTP transport requires uvicorn and starlette: {exc}. "
"Install: pip install uvicorn starlette. Falling back to stdio.",
file=sys.stderr,
)
await _main_stdio()
async def _async_main() -> None:
_startup_banner()
transport = os.environ.get("CB_MCP_TRANSPORT", "stdio").lower()
if transport in ("http", "streamable_http", "streamablehttp"):
await _main_http()
else:
await _main_stdio()
def main() -> None:
"""Synchronous entry point used by the `couchbase-mcp-server` console script
(configured in pyproject.toml). Wraps the async runtime so pip-installed
users get a clean CLI: `couchbase-mcp-server` just works.
"""
asyncio.run(_async_main())
if __name__ == "__main__":
main()