-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathweb_server.py
More file actions
150 lines (125 loc) · 4.74 KB
/
Copy pathweb_server.py
File metadata and controls
150 lines (125 loc) · 4.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import os
import sys
import json
import re
import asyncio
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse, FileResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
from typing import List, Dict, Optional
# Add workspace directory to path to import local modules
workspace_dir = os.path.dirname(os.path.abspath(__file__))
if workspace_dir not in sys.path:
sys.path.insert(0, workspace_dir)
from llm_agent import LLMAgent
from system_manager import SystemManager
app = FastAPI()
# Enable CORS for development
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize Agent and System Manager
agent = LLMAgent(load_now=True)
system_mgr = SystemManager()
class ChatRequest(BaseModel):
query: str
history: List[Dict[str, str]]
class CommandRequest(BaseModel):
command: str
@app.get("/api/stats")
async def get_stats():
"""Returns CPU, RAM, Temp, and Disk metrics."""
try:
cpu = system_mgr.get_cpu_usage()
ram = system_mgr.get_ram_usage()
temp = system_mgr.get_cpu_temperature()
disk = system_mgr.get_disk_usage("C:\\")
return {
"status": "success",
"cpu": cpu,
"ram": ram,
"temp": temp,
"disk": disk
}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/api/clear")
async def clear_memory():
"""Wipes session history and persistent database memory."""
try:
# Clear persistent memory
agent.memory.data = {
"messages": [],
"user_profile": {},
"summaries": [],
"session_count": 1,
"created_at": None,
"last_active": None
}
agent.memory.persist()
return {"status": "success", "message": "Memory cleared"}
except Exception as e:
return {"status": "error", "message": str(e)}
@app.post("/api/execute")
async def execute_command(req: CommandRequest):
"""Executes a system tool command directly and returns the result."""
cmd_str = req.command
# Parse command name and potential args
if ":" in cmd_str:
parts = cmd_str.split(":", 1)
cmd_name = parts[0].strip()
args_str = parts[1].strip()
else:
cmd_name = cmd_str
args_str = ""
result_payload = system_mgr.execute_tool(cmd_name, args_str)
return result_payload
@app.post("/api/chat")
async def chat_stream(req: ChatRequest):
"""Streams token generation from the AI agent using SSE (Server-Sent Events)."""
async def event_generator():
# Strip UI wrappers from query/history for internal agent use
query = req.query
history = req.history
# Convert history format if needed
# agent expects [{"sender": "user"|"model", "text": "..."}]
formatted_history = []
for h in history:
sender = "user" if h.get("sender") in ["user", "أنت"] else "model"
formatted_history.append({"sender": sender, "text": h.get("text", "")})
try:
# Run generator in a separate thread to avoid blocking event loop
loop = asyncio.get_event_loop()
def run_gen():
return list(agent.generate_stream(query, formatted_history))
tokens = await loop.run_in_executor(None, run_gen)
for token in tokens:
yield f"data: {json.dumps({'token': token}, ensure_ascii=False)}\n\n"
await asyncio.sleep(0.005) # micro delay for smooth rendering
yield "data: [DONE]\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)}, ensure_ascii=False)}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
# Mount Static Files from local web_ui directory or fallback to Desktop if exists
local_ui_path = os.path.join(workspace_dir, "web_ui")
if getattr(sys, 'frozen', False):
local_ui_path = os.path.join(sys._MEIPASS, "web_ui")
if os.path.exists(local_ui_path):
static_out_path = local_ui_path
else:
static_out_path = os.path.join(workspace_dir, "web_ui")
if os.path.exists(static_out_path):
app.mount("/static", StaticFiles(directory=static_out_path), name="static")
@app.get("/")
async def read_index():
return FileResponse(os.path.join(static_out_path, "index.html"))
else:
@app.get("/")
async def read_fallback():
return {"status": "success", "message": "AI BOB backend active. Mount web_ui folder to view UI."}