-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmcp_app.py
More file actions
407 lines (329 loc) · 15.4 KB
/
mcp_app.py
File metadata and controls
407 lines (329 loc) · 15.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
"""
Context MCP Server Application
FastMCP server implementation for Claude Code CLI integration.
Provides tool registration, connection lifecycle management, and graceful shutdown.
"""
import asyncio
import signal
import sys
import os
from typing import Optional
import logging
from datetime import datetime, timezone
# Add project root to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "../.."))
from fastmcp import FastMCP
from src.config.settings import settings
# Configure logging (centralized)
from src.logging.manager import configure_logging
# Check if we're running in stdio mode (MCP server)
# If so, use stderr to avoid corrupting the JSON-RPC protocol on stdout
is_stdio_mode = os.environ.get("MCP_ENABLED") == "true"
configure_logging(level=settings.log_level, fmt=settings.log_format, use_stderr=is_stdio_mode)
logger = logging.getLogger(__name__)
class MCPServer:
"""
MCP Server wrapper for Context application
Manages FastMCP server lifecycle, tool registration, and connection state.
"""
def __init__(self):
"""Initialize MCP server with configuration from settings"""
self.mcp: Optional[FastMCP] = None
self.is_running = False
self.connection_state = "disconnected"
self.shutdown_event = asyncio.Event()
logger.info(
f"Initializing MCP Server: {settings.mcp_server_name} v{settings.mcp_server_version}"
)
def create_server(self) -> FastMCP:
"""
Create and configure FastMCP server instance
Returns:
FastMCP: Configured MCP server instance
"""
if not settings.mcp_enabled:
logger.warning("MCP server is disabled in settings")
return None
logger.info("Creating FastMCP server instance")
# Create FastMCP server with metadata
mcp = FastMCP(
name=settings.mcp_server_name,
version=settings.mcp_server_version,
)
# Register connection lifecycle handlers in a version-tolerant way
async def handle_connect():
"""Handle client connection event"""
self.connection_state = "connected"
logger.info("MCP client connected successfully")
logger.debug(f"Connection state: {self.connection_state}")
async def handle_disconnect():
"""Handle client disconnection event"""
self.connection_state = "disconnected"
logger.info("MCP client disconnected")
logger.debug(f"Connection state: {self.connection_state}")
async def handle_error(error: Exception):
"""Handle connection errors"""
self.connection_state = "error"
logger.error(f"MCP connection error: {error}", exc_info=True)
logger.debug(f"Connection state: {self.connection_state}")
# Attempt reconnection for transient failures
if self.is_running:
logger.info("Attempting reconnection after error...")
await self._attempt_reconnection()
def _safe_register(event_name: str, handler):
"""Attempt multiple registration strategies without failing."""
try:
method = getattr(mcp, f"on_{event_name}", None)
if callable(method):
try:
# Some versions accept the handler directly
result = method(handler)
# If decorator-style, calling without args returns a decorator
if callable(result):
result(handler)
return
except TypeError:
# Decorator-style: on_xxx() returns decorator
try:
decorator = method()
if callable(decorator):
decorator(handler)
return
except Exception:
pass
except Exception as e:
logger.debug(f"on_{event_name} registration not available: {e}")
# Fallback to generic add_event_handler if present
try:
add = getattr(mcp, "add_event_handler", None)
if callable(add):
add(event_name, handler)
except Exception as e:
logger.debug(f"add_event_handler fallback failed for {event_name}: {e}")
_safe_register("connect", handle_connect)
_safe_register("disconnect", handle_disconnect)
_safe_register("error", handle_error)
self.mcp = mcp
logger.info("FastMCP server instance created successfully")
return mcp
async def _attempt_reconnection(self):
"""Attempt to reconnect after transient failures"""
for attempt in range(1, settings.mcp_max_retries + 1):
try:
logger.info(
f"Reconnection attempt {attempt}/{settings.mcp_max_retries}"
)
await asyncio.sleep(2**attempt) # Exponential backoff
# Connection will be re-established by FastMCP framework
self.connection_state = "reconnecting"
logger.info("Reconnection successful")
return
except Exception as e:
logger.warning(f"Reconnection attempt {attempt} failed: {e}")
if attempt == settings.mcp_max_retries:
logger.error("Max reconnection attempts reached")
self.connection_state = "failed"
def register_tools(self):
"""Register all MCP tool endpoints"""
if not self.mcp:
logger.error("Cannot register tools: MCP server not initialized")
return
logger.info("Registering MCP tool endpoints")
# Resolve settings at call time to ensure latest flags under pytest/monkeypatch
from src.config.settings import settings as cfg
# Import and register essential tools for Claude Code CLI
from src.mcp_server.tools.health import register_health_tools
from src.mcp_server.tools.capabilities import register_capability_tools
from src.mcp_server.tools.indexing import register_indexing_tools
from src.mcp_server.tools.vector import register_vector_tools
from src.mcp_server.tools.search import register_search_tools
from src.mcp_server.tools.pattern_search import register_pattern_search_tools
from src.mcp_server.tools.ast_search import register_ast_search_tools
from src.mcp_server.tools.cross_language_analysis import (
register_cross_language_tools,
)
from src.mcp_server.tools.dependency_analysis import register_dependency_tools
from src.mcp_server.tools.query_understanding import register_query_tools
from src.mcp_server.tools.indexing_optimization import (
register_indexing_optimization_tools,
)
from src.mcp_server.tools.prompt_tools import register_prompt_tools
from src.mcp_server.tools.context_aware_prompt import register_context_aware_tools
# Optional: workspace tools (only in workspace mode)
from src.mcp_server.tools.workspace import register_workspace_tools
# Optional: deployment integrations (feature-flagged)
from src.mcp_server.tools.deployment_integrations import register_deployment_tools
# Optional: performance profiling tools (feature-flagged)
from src.mcp_server.tools.performance_tools import register_performance_tools
# Optional: security scanning tools (feature-flagged)
from src.mcp_server.tools.security_scanning import register_security_scanning_tools
# Optional: code monitoring tools (feature-flagged)
from src.mcp_server.tools.code_monitoring import register_code_monitoring_tools
# Optional: code generation tools (feature-flagged)
from src.mcp_server.tools.code_generation import register_code_generation_tools
# Disabled for personal use - uncomment if needed:
# from src.mcp_server.tools.cache_management import register_cache_management_tools
# from src.mcp_server.tools.query_optimization import register_query_optimization_tools
# from src.mcp_server.tools.result_presentation import register_result_presentation_tools
# from src.mcp_server.tools.security_tools import register_security_tools
# from src.mcp_server.tools.monitoring_tools import register_monitoring_tools
# from src.mcp_server.tools.model_tools import register_model_tools
# from src.mcp_server.tools.analytics_tools import register_analytics_tools
# Register essential tools
register_health_tools(self.mcp)
register_capability_tools(self.mcp)
register_indexing_tools(self.mcp)
register_vector_tools(self.mcp)
register_search_tools(self.mcp)
register_pattern_search_tools(self.mcp)
register_ast_search_tools(self.mcp)
register_cross_language_tools(self.mcp)
register_dependency_tools(self.mcp)
register_query_tools(self.mcp)
register_indexing_optimization_tools(self.mcp)
register_prompt_tools(self.mcp)
register_context_aware_tools(self.mcp)
# Conditionally register workspace tools (only in workspace mode)
from src.mcp_server.http_server import is_workspace_mode
if is_workspace_mode():
logger.info("Workspace mode detected - registering workspace tools")
register_workspace_tools(self.mcp)
else:
logger.info("Single-project mode - skipping workspace tools")
# Conditionally register performance profiling tools
if getattr(cfg, "enable_performance_profiling", False):
register_performance_tools(self.mcp)
# Conditionally register security scanning tools
if getattr(cfg, "enable_security_scanning", False):
register_security_scanning_tools(self.mcp)
# Conditionally register deployment integrations
if getattr(cfg, "enable_deployment_integrations", False):
register_deployment_tools(self.mcp)
# Conditionally register real-time code monitoring tools
if getattr(cfg, "enable_realtime_monitoring", False):
register_code_monitoring_tools(self.mcp)
# Conditionally register code generation tools
if getattr(cfg, "enable_code_generation", False):
register_code_generation_tools(self.mcp)
# Disabled for personal use - uncomment if needed:
# register_cache_management_tools(self.mcp)
# register_query_optimization_tools(self.mcp)
# register_result_presentation_tools(self.mcp)
# register_security_tools(self.mcp)
# register_monitoring_tools(self.mcp)
# register_model_tools(self.mcp)
# register_analytics_tools(self.mcp)
logger.info(f"Registered {len(settings.mcp_capabilities)} MCP tools")
async def start(self):
"""Start the MCP server"""
if not settings.mcp_enabled:
logger.warning("MCP server is disabled, skipping startup")
return
logger.info("Starting MCP server...")
try:
# Create server instance
self.create_server()
# Register all tools
self.register_tools()
# Mark as running
self.is_running = True
self.connection_state = "listening"
logger.info(
f"MCP server started successfully on {settings.mcp_server_name}"
)
logger.info(
f"Available capabilities: {', '.join(settings.mcp_capabilities)}"
)
# Setup signal handlers for graceful shutdown
self._setup_signal_handlers()
except Exception as e:
logger.error(f"Failed to start MCP server: {e}", exc_info=True)
self.is_running = False
raise
def _setup_signal_handlers(self):
"""Setup signal handlers for graceful shutdown"""
def signal_handler(signum, frame):
"""Handle shutdown signals"""
signal_name = signal.Signals(signum).name
logger.info(
f"Received {signal_name} signal, initiating graceful shutdown..."
)
# Trigger shutdown
asyncio.create_task(self.shutdown())
# Register signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
logger.debug("Signal handlers registered for SIGTERM and SIGINT")
async def shutdown(self, timeout: int = 10):
"""
Gracefully shutdown the MCP server
Args:
timeout: Maximum seconds to wait for cleanup before force shutdown
"""
if not self.is_running:
logger.warning("MCP server is not running")
return
logger.info("Initiating MCP server shutdown...")
self.is_running = False
try:
# Set shutdown timeout
async with asyncio.timeout(timeout):
# Clean up connections
if self.connection_state == "connected":
logger.info("Closing active connections...")
self.connection_state = "disconnecting"
# Cleanup resources
logger.info("Cleaning up server resources...")
# Mark as shutdown
self.connection_state = "shutdown"
try:
self.shutdown_event.set()
except Exception as e:
logger.error(f"Error setting shutdown event: {e}")
logger.info("MCP server shutdown completed successfully")
except asyncio.TimeoutError:
logger.error(f"Shutdown timeout ({timeout}s) exceeded, forcing shutdown")
self.connection_state = "force_shutdown"
try:
self.shutdown_event.set()
except Exception:
pass
except Exception as e:
logger.error(f"Error during shutdown: {e}", exc_info=True)
try:
self.shutdown_event.set()
except Exception:
pass
def get_status(self) -> dict:
"""
Get current MCP server status
Returns:
dict: Server status information
"""
return {
"enabled": settings.mcp_enabled,
"running": self.is_running,
"connection_state": self.connection_state,
"server_name": settings.mcp_server_name,
"version": settings.mcp_server_version,
"capabilities": settings.mcp_capabilities,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
# Global MCP server instance
mcp_server = MCPServer()
async def start_mcp_server():
"""Start the MCP server (entry point for integration)"""
await mcp_server.start()
async def shutdown_mcp_server():
"""Shutdown the MCP server (entry point for integration)"""
await mcp_server.shutdown()
def get_mcp_status() -> dict:
"""Get MCP server status (entry point for health checks)"""
return mcp_server.get_status()
if __name__ == "__main__":
# Run MCP server standalone
async def main():
await start_mcp_server()
# Keep running until shutdown signal
await mcp_server.shutdown_event.wait()
asyncio.run(main())