33from typing import Dict , List , Optional
44
55import aiohttp
6- from fastapi .responses import JSONResponse , Response , StreamingResponse
6+ from fastapi .responses import Response , StreamingResponse
77from starlette .concurrency import iterate_in_threadpool
88
99from backend .server .constants import NODE_STATUS_AVAILABLE
10+ from backend .server .openai_compat import (
11+ decode_http_response_envelope ,
12+ openai_error_response ,
13+ )
1014from parallax_utils .logging_config import get_logger
1115from parallax_utils .request_metrics import get_request_metrics
1216
@@ -100,9 +104,11 @@ async def _forward_request(self, request_data: Dict, request_id: str, received_t
100104 self .scheduler_manage is None
101105 or not self .scheduler_manage .get_schedule_status () == NODE_STATUS_AVAILABLE
102106 ):
103- return JSONResponse (
104- content = {"error" : "Server is not ready" },
105- status_code = 500 ,
107+ return openai_error_response (
108+ "Server is not ready" ,
109+ status_code = 503 ,
110+ err_type = "server_unavailable" ,
111+ code = "server_not_ready" ,
106112 )
107113
108114 # Try to get a success response
@@ -119,16 +125,20 @@ async def _forward_request(self, request_data: Dict, request_id: str, received_t
119125 )
120126 except Exception as e :
121127 logger .exception (f"get_routing_table error: { e } " )
122- return JSONResponse (
123- content = { "error" : " Get routing table error"} ,
128+ return openai_error_response (
129+ " Get routing table error" ,
124130 status_code = 500 ,
131+ err_type = "server_error" ,
132+ code = "routing_table_error" ,
125133 )
126134
127135 # None -> scheduler has not set yet; treat as hard error (no waiting here)
128136 if routing_table is None :
129- return JSONResponse (
130- content = { "error" : " Routing pipelines not ready"} ,
137+ return openai_error_response (
138+ " Routing pipelines not ready" ,
131139 status_code = 503 ,
140+ err_type = "server_unavailable" ,
141+ code = "routing_not_ready" ,
132142 )
133143
134144 # Non-empty -> proceed
@@ -143,9 +153,11 @@ async def _forward_request(self, request_data: Dict, request_id: str, received_t
143153
144154 # If still empty after retries, return 429 Too Many Requests
145155 if routing_table is not None and len (routing_table ) == 0 :
146- return JSONResponse (
147- content = { "error" : " All pipelines are busy or not ready. Please retry later."} ,
156+ return openai_error_response (
157+ " All pipelines are busy or not ready. Please retry later." ,
148158 status_code = 429 ,
159+ err_type = "rate_limit_error" ,
160+ code = "rate_limit_exceeded" ,
149161 )
150162
151163 backend_request = self ._prepare_backend_request (
@@ -203,19 +215,33 @@ async def stream_generator():
203215 return resp
204216 else :
205217 response = stub .chat_completion (backend_request )
206- content = (await anext (iterate_in_threadpool (response ))).decode ()
218+ content = await anext (iterate_in_threadpool (response ))
219+ decoded_response = decode_http_response_envelope (content )
220+ if decoded_response is None :
221+ status_code = 200
222+ content_type = "application/json"
223+ body = content
224+ else :
225+ status_code , content_type , body = decoded_response
207226 logger .debug (f"Non-stream response completed for { request_id } " )
208- return Response (content = content , media_type = "application/json" )
227+ return Response (
228+ content = body ,
229+ status_code = status_code ,
230+ headers = {"content-type" : content_type },
231+ media_type = None ,
232+ )
209233 except Exception as e :
210234 forward_attempts += 1
211235 if forward_attempts < self .MAX_FORWARD_RETRY :
212236 # small async delay before re-forwarding
213237 await asyncio .sleep (self .FORWARD_DELAY_SEC )
214238 logger .warning (f"Error in _forward_request: { e } . Retry attemps { forward_attempts } " )
215239
216- return JSONResponse (
217- content = {"error" : "Internal server error" },
218- status_code = 500 ,
240+ return openai_error_response (
241+ "Downstream request failed" ,
242+ status_code = 502 ,
243+ err_type = "upstream_error" ,
244+ code = "upstream_error" ,
219245 )
220246
221247 async def v1_chat_completions (self , request_data : Dict , request_id : str , received_ts : int ):
0 commit comments