-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathapi.py
More file actions
93 lines (75 loc) · 2.64 KB
/
api.py
File metadata and controls
93 lines (75 loc) · 2.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import logging
import sys
from typing import Optional
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, Field
from . import runtime
from .mem_agent.engine import execute_sandboxed_code
from .mem_agent.utils import (
create_memory_if_not_exists,
format_results,
)
from .schemas import (
ChatCompletionRequest,
ChatMessage,
ResponsesRequest,
StartRequest,
downloadRequest,
)
logger = logging.getLogger("app")
_current_model_path: Optional[str] = None
_default_max_tokens: Optional[int] = None # Use dynamic model-aware limits by default
_memory_path = ""
_messages: list[ChatMessage] = []
app = FastAPI()
@app.get("/ping")
async def ping():
return {"message": "Badda-Bing Badda-Bang"}
@app.post("/start")
async def start_model(request: StartRequest):
"""Load the model and start the agent"""
global _messages, _runner, _memory_path
print(f"CACHE PATH{request.model_cache_path}")
_messages = [ChatMessage(role="system", content=request.system_prompt)]
_memory_path = request.memory_path
logger.info(f"{runtime.backend}")
runtime.backend.get_or_load_model(request.model, request.model_cache_path)
return {"message": "Model loaded"}
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
"""Create a chat completion."""
global _messages, _memory_path
try:
if request.stream:
result = ({}, "")
if request.python_code:
result = execute_sandboxed_code(
code=request.python_code,
allowed_path=_memory_path,
import_module="server.mem_agent.tools",
)
_messages.append(
ChatMessage(role="user", content=format_results(result[0], result[1]))
)
# Streaming response
return StreamingResponse(
runtime.backend.generate_chat_stream(_messages, request),
media_type="text/plain",
headers={"Cache-Control": "no-cache"},
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/v1/responses")
async def create_chat_response(request: ResponsesRequest):
"""
Create a response with openResponses format
"""
if request.stream:
return StreamingResponse(
runtime.backend.generate_response_chat_stream(request),
media_type="text/plain",
headers={"Cache-Control": "no-cache"},
)
else:
return await runtime.backend.generate_response_chat(request)