You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
#!/usr/bin/env python
# -*- coding: utf-8 -*-
########################################################################
#
# Copyright (c) 2025 hczq.com, Inc. All Rights Reserved
#
########################################################################
"""
File: main.py
Author: Jeffrey([email protected])
Date: 2025/08/21 10:00:21
Description: Enhanced main entry point with logging, auth, and database support
"""
import asyncio
import sys
from pathlib import Path
from xml.sax import default_parser_list
import uvicorn
from fastmcp.server.http import StarletteWithLifespan
# Add project root to Python path
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from fastmcp import FastMCP
from starlette.requests import Request
from starlette.responses import PlainTextResponse, JSONResponse
from app.core.config import get_settings
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from contextlib import asynccontextmanager
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.applications import Starlette
from starlette.routing import Route
import time
from typing import AsyncGenerator
from app.core.logging import init_logging
# Initialize configuration first
settings = get_settings()
init_logging(settings)
from loguru import logger
# Initialize core components
# Get logger after initialization
from app.mcp_server.main import main_mcp, async_startup_handler, shutdown_handler
@asynccontextmanager
async def combined_lifespan(app: Starlette):
"""合并的生命周期管理器,确保初始化只在需要时执行"""
# 启动逻辑
logger.info("Starting application initialization")
try:
# 执行异步启动处理
await async_startup_handler(main_mcp)
logger.info("Application initialization completed successfully")
async with mcp_app.lifespan(main_mcp):
yield
logger.info("FastMCP lifespan started")
except Exception as e:
logger.error(f"Application startup failed: {e}")
raise
finally:
# 关闭逻辑
logger.info("Starting application shutdown")
await shutdown_handler()
# 创建 MCP 应用
mcp_app = main_mcp.http_app(path="/mcp")
# 创建主应用,使用合并的生命周期
app = Starlette(
routes=[Mount("/", app=mcp_app)],
lifespan=combined_lifespan, # 使用自定义的生命周期管理器
)
if __name__ == "__main__":
try:
# 关键修复:确保多进程启动时使用正确的模式
uvicorn.run(
"main_asgi:app", # 直接传递app实例而不是字符串引用
host="0.0.0.0",
port=8516,
workers=2, # 可以设置为需要的进程数
# 添加进程管理配置
reload=False, # 生产环境关闭重载
log_config=None, # 使用默认日志配置
)
except KeyboardInterrupt:
print("\nServer stopped by user")
except Exception as e:
logger.error(f"Failed to start server: {e}")
sys.exit(1)
# !/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: haifeng
# @Date: 2025-10-16 17:38:52
# @Version: 1.0
# Import permission middleware
from contextlib import asynccontextmanager
from fastmcp import FastMCP
from app.core.config import get_settings
from app.mcp_client.yjswork.yjs_client import lifespan
from app.middleware import permission_middleware
from loguru import logger
from starlette.requests import Request
from starlette.responses import PlainTextResponse, JSONResponse
import asyncio
from mcp.server.lowlevel.server import LifespanResultT
settings = get_settings()
from collections.abc import AsyncIterator
from typing import Any
# Create main MCP server
main_mcp = FastMCP(name="HCZQ MCP Server", version="1.0.0")
async def setup_db_and_cache():
from app.core.database import init_database
from app.core.cache import init_cache_manager
# Initialize database
init_database(settings)
# Initialize cache(Redis)
init_cache_manager(settings)
async def setup_mcp_server(app:FastMCP):
"""Setup MCP server with all components"""
logger.info("Setting up MCP server with sub-servers")
try:
# Import sub-servers with error handling
from app.mcp_server.server import mcp_stock, mcp_fund, mcp_yjswork
# await app.import_server(mcp_stock, prefix="stock")
app.mount(mcp_stock, prefix="stock")
logger.info("Stock server imported successfully")
# await app.import_server(mcp_fund, prefix="fund")
app.mount(mcp_fund, prefix="fund")
logger.info("Fund server imported successfully")
# await app.import_server(mcp_yjswork, prefix="yjswork")
app.mount(mcp_yjswork, prefix="yjswork")
logger.info("YJSWork server imported successfully")
except Exception as e:
logger.error("Failed to setup MCP servers", error=str(e))
raise
async def add_middlewares(app):
"""Add middlewares to the MCP server"""
logger.info("Adding middlewares to MCP server")
try:
from app.middleware import (
error_handling_middleware,
custom_logger_middleware,
ratelimit_middleware,
permission_middleware,
auth_middleware,
)
app.add_middleware(auth_middleware)
logger.info("Auth middleware added")
# Add tool permission middleware
app.add_middleware(permission_middleware)
logger.info("Tool permission middleware added")
# Add logging middleware
app.add_middleware(custom_logger_middleware)
logger.info("Logging middleware added")
# Add error handling middleware
app.add_middleware(error_handling_middleware)
logger.info("Error handling middleware added")
# Add rate limiting middleware
app.add_middleware(ratelimit_middleware)
logger.info("Rate limiting middleware added")
except Exception as e:
logger.error("Failed to add middlewares", error=str(e))
raise
@main_mcp.custom_route("/health", methods=["GET"])
async def health_check(request: Request) -> JSONResponse:
"""Enhanced health check endpoint"""
try:
health_status = {
"status": "healthy",
"version": "1.0.0",
"components": {
"logging": True, # If we can log, logging is working
},
"timestamp": str(asyncio.get_event_loop().time()),
}
return JSONResponse(health_status, status_code=200)
except Exception as e:
logger.error("Health check failed", error=str(e))
return JSONResponse({
"status": "unhealthy",
"error": str(e),
}, status_code=503)
@main_mcp.custom_route("/metrics", methods=["GET"])
async def metrics_endpoint(request: Request) -> PlainTextResponse:
"""Prometheus metrics endpoint"""
try:
# Basic metrics - extend as needed
metrics = [
"# HELP hczq_mcp_server_info Server information",
"# TYPE hczq_mcp_server_info gauge",
'hczq_mcp_server_info{version="1.0.0"} 1',
"",
"# HELP hczq_mcp_server_uptime_seconds Server uptime in seconds",
"# TYPE hczq_mcp_server_uptime_seconds gauge",
f"hczq_mcp_server_uptime_seconds {asyncio.get_event_loop().time()}",
]
return PlainTextResponse("\n".join(metrics), media_type="text/plain")
except Exception as e:
logger.error("Metrics endpoint failed", error=str(e))
return PlainTextResponse("# Error generating metrics\n", status_code=500)
def startup_handler(app:FastMCP):
try:
setup_db_and_cache()
setup_mcp_server(app)
add_middlewares(app)
except Exception as e:
logger.error("Failed to setup MCP servers", error=str(e))
raise
async def shutdown_handler():
"""Graceful shutdown handler"""
logger.info("Starting graceful shutdown")
try:
logger.info("Basic shutdown completed")
from app.core.database import close_database
from app.core.cache import close_cache_manager
await close_cache_manager()
await close_database()
except Exception as e:
logger.error("Error during shutdown", error=str(e))
logger.info("Graceful shutdown completed")
async def async_startup_handler(app: FastMCP):
"""异步启动处理"""
try:
# 执行初始化操作
await setup_db_and_cache()
await setup_mcp_server(app)
await add_middlewares(app)
logger.info("MCP server startup completed")
except Exception as e:
logger.error(f"Startup handler failed: {e}")
raise
It works fine when workers = 1. When workers > 1, report the following error
Error in post_writer
Traceback (most recent call last):
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/.venv/lib/python3.10/site-packages/mcp/client/streamable_http.py", line 415, in post_writer
await handle_request_async()
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/.venv/lib/python3.10/site-packages/mcp/client/streamable_http.py", line 409, in handle_request_async
await self._handle_post_request(ctx)
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/.venv/lib/python3.10/site-packages/mcp/client/streamable_http.py", line 278, in _handle_post_request
response.raise_for_status()
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/.venv/lib/python3.10/site-packages/httpx/_models.py", line 829, in raise_for_status
raise HTTPStatusError(message, request=request, response=self)
httpx.HTTPStatusError: Client error '400 Bad Request' for url 'http://127.0.0.1:8516/mcp'
For more information check: https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/400
Session termination failed: 400
Traceback (most recent call last):
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/tests/yjs_tools_test.py", line 45, in <module>
asyncio.run(run_client())
File "/data/users/chenhaifeng/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/data/users/chenhaifeng/.local/share/uv/python/cpython-3.10.16-linux-x86_64-gnu/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/tests/yjs_tools_test.py", line 14, in run_client
await client.ping()
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/.venv/lib/python3.10/site-packages/fastmcp/client/client.py", line 498, in ping
result = await self.session.send_ping()
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/.venv/lib/python3.10/site-packages/mcp/client/session.py", line 178, in send_ping
return await self.send_request(
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/.venv/lib/python3.10/site-packages/mcp/shared/session.py", line 261, in send_request
await self._write_stream.send(SessionMessage(message=JSONRPCMessage(jsonrpc_request), metadata=metadata))
File "/data/users/chenhaifeng/workspace/hczq-mcp-server/.venv/lib/python3.10/site-packages/anyio/streams/memory.py", line 256, in send
raise BrokenResourceError from None
anyio.BrokenResourceError
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
-
This is my code
It works fine when workers = 1. When workers > 1, report the following error
Beta Was this translation helpful? Give feedback.
All reactions