Skip to content

Commit 4e1f125

Browse files
authored
feat: make scheduler openai agent compatible (#468)
1 parent d98ff3a commit 4e1f125

5 files changed

Lines changed: 347 additions & 20 deletions

File tree

src/backend/main.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
1010
from fastapi.staticfiles import StaticFiles
1111

12+
from backend.server.openai_compat import openai_error_response, openai_models_payload
1213
from backend.server.request_handler import RequestHandler
1314
from backend.server.scheduler_manage import SchedulerManage
1415
from backend.server.server_args import parse_args
@@ -83,6 +84,18 @@ async def model_list():
8384
)
8485

8586

87+
@app.get("/v1/models")
88+
async def openai_v1_models():
89+
model_name = None
90+
if scheduler_manage is not None:
91+
try:
92+
model_name = scheduler_manage.get_model_name()
93+
except Exception as e:
94+
logger.debug(f"Unable to get scheduler model name: {e}")
95+
96+
return JSONResponse(content=openai_models_payload(model_name), status_code=200)
97+
98+
8699
@app.post("/scheduler/init")
87100
async def scheduler_init(raw_request: Request):
88101
request_data = await raw_request.json()
@@ -182,7 +195,23 @@ async def cluster_status_json() -> JSONResponse:
182195

183196
@app.post("/v1/chat/completions")
184197
async def openai_v1_chat_completions(raw_request: Request):
185-
request_data = await raw_request.json()
198+
try:
199+
request_data = await raw_request.json()
200+
except Exception:
201+
return openai_error_response(
202+
"Invalid request body",
203+
status_code=400,
204+
err_type="invalid_request_error",
205+
code="invalid_request_error",
206+
)
207+
if not isinstance(request_data, dict):
208+
return openai_error_response(
209+
"Request body must be a JSON object",
210+
status_code=400,
211+
err_type="invalid_request_error",
212+
code="invalid_request_error",
213+
)
214+
186215
request_id = uuid.uuid4()
187216
received_ts = time.time()
188217
return await request_handler.v1_chat_completions(request_data, request_id, received_ts)
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import base64
2+
import json
3+
from typing import Any, Dict, Optional, Tuple
4+
5+
from fastapi.responses import JSONResponse
6+
7+
PARALLAX_HTTP_RESPONSE_ENVELOPE = "__parallax_http_response__"
8+
9+
10+
def openai_error_payload(
11+
message: str,
12+
*,
13+
err_type: str = "server_error",
14+
param: Optional[str] = None,
15+
code: Optional[str] = None,
16+
) -> Dict[str, Dict[str, Any]]:
17+
return {
18+
"error": {
19+
"message": message,
20+
"type": err_type,
21+
"param": param,
22+
"code": code or err_type,
23+
}
24+
}
25+
26+
27+
def openai_error_response(
28+
message: str,
29+
*,
30+
status_code: int,
31+
err_type: str = "server_error",
32+
param: Optional[str] = None,
33+
code: Optional[str] = None,
34+
) -> JSONResponse:
35+
return JSONResponse(
36+
content=openai_error_payload(
37+
message,
38+
err_type=err_type,
39+
param=param,
40+
code=code,
41+
),
42+
status_code=status_code,
43+
)
44+
45+
46+
def openai_models_payload(model_name: Optional[str]) -> Dict[str, Any]:
47+
models = []
48+
if model_name:
49+
models.append(
50+
{
51+
"id": model_name,
52+
"object": "model",
53+
"created": 0,
54+
"owned_by": "parallax",
55+
}
56+
)
57+
return {"object": "list", "data": models}
58+
59+
60+
def encode_http_response_envelope(
61+
*,
62+
status_code: int,
63+
content_type: Optional[str],
64+
body: bytes,
65+
) -> bytes:
66+
envelope = {
67+
PARALLAX_HTTP_RESPONSE_ENVELOPE: True,
68+
"status_code": int(status_code),
69+
"content_type": content_type or "application/json",
70+
"body_base64": base64.b64encode(body).decode("ascii"),
71+
}
72+
return json.dumps(envelope, separators=(",", ":")).encode("utf-8")
73+
74+
75+
def decode_http_response_envelope(content: bytes) -> Optional[Tuple[int, str, bytes]]:
76+
try:
77+
envelope = json.loads(content.decode("utf-8"))
78+
except (UnicodeDecodeError, json.JSONDecodeError):
79+
return None
80+
81+
if not isinstance(envelope, dict) or envelope.get(PARALLAX_HTTP_RESPONSE_ENVELOPE) is not True:
82+
return None
83+
84+
body_base64 = envelope.get("body_base64")
85+
if not isinstance(body_base64, str):
86+
return None
87+
88+
try:
89+
body = base64.b64decode(body_base64)
90+
except ValueError:
91+
return None
92+
93+
status_code = int(envelope.get("status_code", 200))
94+
content_type = envelope.get("content_type") or "application/json"
95+
return status_code, str(content_type), body

src/backend/server/request_handler.py

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,14 @@
33
from typing import Dict, List, Optional
44

55
import aiohttp
6-
from fastapi.responses import JSONResponse, Response, StreamingResponse
6+
from fastapi.responses import Response, StreamingResponse
77
from starlette.concurrency import iterate_in_threadpool
88

99
from backend.server.constants import NODE_STATUS_AVAILABLE
10+
from backend.server.openai_compat import (
11+
decode_http_response_envelope,
12+
openai_error_response,
13+
)
1014
from parallax_utils.logging_config import get_logger
1115
from parallax_utils.request_metrics import get_request_metrics
1216

@@ -100,9 +104,11 @@ async def _forward_request(self, request_data: Dict, request_id: str, received_t
100104
self.scheduler_manage is None
101105
or not self.scheduler_manage.get_schedule_status() == NODE_STATUS_AVAILABLE
102106
):
103-
return JSONResponse(
104-
content={"error": "Server is not ready"},
105-
status_code=500,
107+
return openai_error_response(
108+
"Server is not ready",
109+
status_code=503,
110+
err_type="server_unavailable",
111+
code="server_not_ready",
106112
)
107113

108114
# Try to get a success response
@@ -119,16 +125,20 @@ async def _forward_request(self, request_data: Dict, request_id: str, received_t
119125
)
120126
except Exception as e:
121127
logger.exception(f"get_routing_table error: {e}")
122-
return JSONResponse(
123-
content={"error": "Get routing table error"},
128+
return openai_error_response(
129+
"Get routing table error",
124130
status_code=500,
131+
err_type="server_error",
132+
code="routing_table_error",
125133
)
126134

127135
# None -> scheduler has not set yet; treat as hard error (no waiting here)
128136
if routing_table is None:
129-
return JSONResponse(
130-
content={"error": "Routing pipelines not ready"},
137+
return openai_error_response(
138+
"Routing pipelines not ready",
131139
status_code=503,
140+
err_type="server_unavailable",
141+
code="routing_not_ready",
132142
)
133143

134144
# Non-empty -> proceed
@@ -143,9 +153,11 @@ async def _forward_request(self, request_data: Dict, request_id: str, received_t
143153

144154
# If still empty after retries, return 429 Too Many Requests
145155
if routing_table is not None and len(routing_table) == 0:
146-
return JSONResponse(
147-
content={"error": "All pipelines are busy or not ready. Please retry later."},
156+
return openai_error_response(
157+
"All pipelines are busy or not ready. Please retry later.",
148158
status_code=429,
159+
err_type="rate_limit_error",
160+
code="rate_limit_exceeded",
149161
)
150162

151163
backend_request = self._prepare_backend_request(
@@ -203,19 +215,33 @@ async def stream_generator():
203215
return resp
204216
else:
205217
response = stub.chat_completion(backend_request)
206-
content = (await anext(iterate_in_threadpool(response))).decode()
218+
content = await anext(iterate_in_threadpool(response))
219+
decoded_response = decode_http_response_envelope(content)
220+
if decoded_response is None:
221+
status_code = 200
222+
content_type = "application/json"
223+
body = content
224+
else:
225+
status_code, content_type, body = decoded_response
207226
logger.debug(f"Non-stream response completed for {request_id}")
208-
return Response(content=content, media_type="application/json")
227+
return Response(
228+
content=body,
229+
status_code=status_code,
230+
headers={"content-type": content_type},
231+
media_type=None,
232+
)
209233
except Exception as e:
210234
forward_attempts += 1
211235
if forward_attempts < self.MAX_FORWARD_RETRY:
212236
# small async delay before re-forwarding
213237
await asyncio.sleep(self.FORWARD_DELAY_SEC)
214238
logger.warning(f"Error in _forward_request: {e}. Retry attemps {forward_attempts}")
215239

216-
return JSONResponse(
217-
content={"error": "Internal server error"},
218-
status_code=500,
240+
return openai_error_response(
241+
"Downstream request failed",
242+
status_code=502,
243+
err_type="upstream_error",
244+
code="upstream_error",
219245
)
220246

221247
async def v1_chat_completions(self, request_data: Dict, request_id: str, received_ts: int):

src/parallax/p2p/server.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import zmq
2323
from lattica import ConnectionHandler, Lattica, rpc_method, rpc_stream, rpc_stream_iter
2424

25+
from backend.server.openai_compat import encode_http_response_envelope
2526
from backend.server.rpc_connection_handler import RPCConnectionHandler
2627
from parallax.p2p.proto import forward_pb2
2728
from parallax.p2p.utils import AsyncWorker
@@ -203,10 +204,21 @@ def chat_completion(
203204
response = client.post(
204205
f"http://localhost:{self.http_port}/v1/chat/completions", json=request
205206
)
206-
yield response.content
207+
yield encode_http_response_envelope(
208+
status_code=response.status_code,
209+
content_type=response.headers.get("content-type"),
210+
body=response.content,
211+
)
207212
except Exception as e:
208213
logger.exception(f"Error in chat completion: {e}")
209-
yield b"internal server error"
214+
yield encode_http_response_envelope(
215+
status_code=502,
216+
content_type="application/json",
217+
body=(
218+
b'{"error":{"message":"Internal server error",'
219+
b'"type":"upstream_error","param":null,"code":"upstream_error"}}'
220+
),
221+
)
210222

211223

212224
def check_and_run_weight_refit(gradient_server, message):

0 commit comments

Comments
 (0)