Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions GUYMCP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
1.增加了guymcp目录,下含mcp服务端和客户端,以及mcp的配置文件。

2 修改原项目 bot.chatgpt 下chat_gpt_bot.py文件,加入mcp客户端的应答。
10 changes: 10 additions & 0 deletions bot/chatgpt/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Server Configuration
MCP_PORT=8020
SERPER_API_KEY=xxx

# Client Configuration
MCP_SERVER_URL=http://localhost:8020

OPENAI_API_KEY=
OPENAI_BASE_URL=
OPENAI_MODEL=
Empty file added bot/chatgpt/__init__.py
Empty file.
Binary file added bot/chatgpt/__pycache__/__init__.cpython-311.pyc
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file added bot/chatgpt/__pycache__/client.cpython-311.pyc
Binary file not shown.
Binary file added bot/chatgpt/__pycache__/client.cpython-313.pyc
Binary file not shown.
63 changes: 51 additions & 12 deletions bot/chatgpt/chat_gpt_bot.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# encoding:utf-8

# cd ../../guymcp 运行uv run server.py --host 127.0.0.1 --port 8020 启动MCP服务器
import time

import openai
import openai.error
# import openai.error
# 因为新代码中asyncio.run,openai必须升级到1.0以上,openai.error.XXXError被弃用,需要更新代码
# 更新代码中所有openai.error.XXXError引用为直接使用错误类:openai.XXXError
import requests
from common import const
from bot.bot import Bot
Expand All @@ -17,6 +19,11 @@
from config import conf, load_config
from bot.baidu.baidu_wenxin_session import BaiduWenxinSession

import re
from bot.chatgpt.client import MCPClient
import asyncio


# OpenAI对话模型API (可用)
class ChatGPTBot(Bot, OpenAIImage):
def __init__(self):
Expand Down Expand Up @@ -51,6 +58,20 @@ def __init__(self):
for key in remove_keys:
self.args.pop(key, None) # 如果键不存在,使用 None 来避免抛出错误

async def gy_getanswer(self,querystr) -> str:
# guy 新建函数用于调用MCPClient
client = MCPClient()
clean_result = 'null before call gychat_loop.'
try:
server_url = "http://127.0.0.1:8020/sse"
await client.connect_to_sse_server(server_url)
res = await client.gychat_loop(querystr)
clean_result = re.sub(r'\[.*?\]', '', res)

finally:
await client.cleanup()
return clean_result

def reply(self, query, context=None):
# acquire reply content
if context.type == ContextType.TEXT:
Expand Down Expand Up @@ -124,37 +145,55 @@ def reply_text(self, session: ChatGPTSession, api_key=None, args=None, retry_cou
"""
try:
if conf().get("rate_limit_chatgpt") and not self.tb4chatgpt.get_token():
raise openai.error.RateLimitError("RateLimitError: rate limit exceeded")
raise openai.RateLimitError("RateLimitError: rate limit exceeded")
# if api_key == None, the default openai.api_key will be used
if args is None:
args = self.args
response = openai.ChatCompletion.create(api_key=api_key, messages=session.messages, **args)
# logger.debug("[CHATGPT] response={}".format(response))
# 旧版的openai创建对话的方法因引入1.0以后,所以使用新版本的create方法
# response = openai.ChatCompletion.create(api_key=api_key, messages=session.messages, **args)
response = openai.chat.completions.create(
model="Qwen/Qwen2.5-72B-Instruct",
# model=os.getenv("OPENAI_MODEL"),
max_tokens=1000,
messages=session.messages
)

# guy
guy_answer = "use get MCP answer"
user_content = next(
(msg["content"] for msg in reversed(session.messages) if msg.get("role") == "user"),
"没有找到用户消息"
)
guy_answer = asyncio.run(self.gy_getanswer(user_content))
logger.debug("[CHATGPT] response={}".format(response))
# logger.info("[ChatGPT] reply={}, total_tokens={}".format(response.choices[0]['message']['content'], response["usage"]["total_tokens"]))
return {
"total_tokens": response["usage"]["total_tokens"],
"completion_tokens": response["usage"]["completion_tokens"],
"content": response.choices[0]["message"]["content"],
# "total_tokens": response["usage"]["total_tokens"], 旧版本的openai的返回值
# "completion_tokens": response["usage"]["completion_tokens"],
# "content": response.choices[0]["message"]["content"],
"total_tokens": response.usage.total_tokens,
"completion_tokens": response.usage.completion_tokens,
"content":guy_answer,
}
except Exception as e:
need_retry = retry_count < 2
result = {"completion_tokens": 0, "content": "我现在有点累了,等会再来吧"}
if isinstance(e, openai.error.RateLimitError):
if isinstance(e, openai.RateLimitError):
logger.warn("[CHATGPT] RateLimitError: {}".format(e))
result["content"] = "提问太快啦,请休息一下再问我吧"
if need_retry:
time.sleep(20)
elif isinstance(e, openai.error.Timeout):
elif isinstance(e, openai.Timeout):
logger.warn("[CHATGPT] Timeout: {}".format(e))
result["content"] = "我没有收到你的消息"
if need_retry:
time.sleep(5)
elif isinstance(e, openai.error.APIError):
elif isinstance(e, openai.APIError):
logger.warn("[CHATGPT] Bad Gateway: {}".format(e))
result["content"] = "请再问我一次"
if need_retry:
time.sleep(10)
elif isinstance(e, openai.error.APIConnectionError):
elif isinstance(e, openai.APIConnectionError):
logger.warn("[CHATGPT] APIConnectionError: {}".format(e))
result["content"] = "我连接不到你的网络"
if need_retry:
Expand Down
176 changes: 176 additions & 0 deletions bot/chatgpt/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# cd ../guymcp 运行uv run server.py --host 127.0.0.1 --port 8020 启动MCP服务器
import asyncio
import json
import os
from typing import Optional
from contextlib import AsyncExitStack
import time
from mcp import ClientSession
from mcp.client.sse import sse_client

from openai import AsyncOpenAI
from dotenv import load_dotenv

load_dotenv() # load environment variables from .env

class MCPClient:
def __init__(self):
# Initialize session and client objects
self.session: Optional[ClientSession] = None
self.exit_stack = AsyncExitStack()
self.openai = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL"))

async def connect_to_sse_server(self, server_url: str):
"""Connect to an MCP server running with SSE transport"""
# Store the context managers so they stay alive
self._streams_context = sse_client(url=server_url)
streams = await self._streams_context.__aenter__()

self._session_context = ClientSession(*streams)
self.session: ClientSession = await self._session_context.__aenter__()

# Initialize
await self.session.initialize()

# List available tools to verify connection
print("Initialized SSE client...")
print("Listing tools...")
response = await self.session.list_tools()
tools = response.tools
print("\nConnected to server with tools:", [tool.name for tool in tools])

async def cleanup(self):
"""Properly clean up the session and streams"""
if self._session_context:
await self._session_context.__aexit__(None, None, None)
if self._streams_context:
await self._streams_context.__aexit__(None, None, None)

async def process_query(self, query: str) -> str:
"""Process a query using OpenAI API and available tools"""
messages = [
{
"role": "user",
"content": query
}
]

response = await self.session.list_tools()
available_tools = [{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
} for tool in response.tools]

# Initial OpenAI API call
completion = await self.openai.chat.completions.create(
model="Qwen/Qwen2.5-72B-Instruct",
# model=os.getenv("OPENAI_MODEL"),
max_tokens=1000,
messages=messages,
tools=available_tools
)

# Process response and handle tool calls
tool_results = []
final_text = []

assistant_message = completion.choices[0].message

if assistant_message.tool_calls:
for tool_call in assistant_message.tool_calls:
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)

# Execute tool call
result = await self.session.call_tool(tool_name, tool_args)
tool_results.append({"call": tool_name, "result": result})
final_text.append(f"[Calling tool {tool_name} with args {tool_args}]")

# Continue conversation with tool results
messages.extend([
{
"role": "assistant",
"content": None,
"tool_calls": [tool_call]
},
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": result.content[0].text
}
])

print(f"Tool {tool_name} returned: {result.content[0].text}")
print("messages", messages)
# Get next response from OpenAI
completion = await self.openai.chat.completions.create(
model=os.getenv("OPENAI_MODEL"),
max_tokens=1000,
messages=messages,
)
if isinstance(completion.choices[0].message.content, (dict, list)):
final_text.append(str(completion.choices[0].message.content))
else:
final_text.append(completion.choices[0].message.content)
else:
if isinstance(assistant_message.content, (dict, list)):
final_text.append(str(assistant_message.content))
else:
final_text.append(assistant_message.content)

return "\n".join(final_text)

async def chat_loop(self):
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")

while True:
try:
query = input("\nQuery: ").strip()

if query.lower() == 'quit':
break

response = await self.process_query(query)
print("\n" + response)

except Exception as e:
print(f"\nError: {str(e)}")
async def gychat_loop(self,querystr) -> str:
"""Run an interactive chat loop"""
print("\nMCP Client Started!")
print("Type your queries or 'quit' to exit.")
response = "响应初始化..."
try:
# query = input("\nQuery: ").strip()
query = querystr
if query.lower() == 'quit':
return

response = await self.process_query(query)
print("\n" + response)

except Exception as e:
print(f"\nError: {str(e)}")
finally:
return response
async def main():
if len(sys.argv) < 2:
print("Usage: uv run client.py <URL of SSE MCP server (i.e. http://localhost:8080/sse)>")
sys.exit(1)

client = MCPClient()
try:
await client.connect_to_sse_server(server_url=sys.argv[1])
await client.chat_loop()
finally:
await client.cleanup()

if __name__ == "__main__":
import sys
asyncio.run(main())
Loading