2222from starlette .requests import Request
2323from starlette .responses import JSONResponse , StreamingResponse
2424from vllm .engine .arg_utils import AsyncEngineArgs
25- from vllm .engine .async_llm_engine import AsyncLLMEngine
25+ from vllm .engine .async_llm_engine import AsyncEngineDeadError , AsyncLLMEngine
2626
2727try :
2828 from vllm .v1 .engine .async_llm import AsyncLLM
@@ -339,6 +339,13 @@ def __init__(
339339 chat_template = chat_template ,
340340 use_v1_engine = use_v1_engine ,
341341 )
342+ self .healthy = True
343+
344+ async def check_health (self ):
345+ if self .healthy :
346+ return {"status" : "healthy" }
347+ else :
348+ raise RuntimeError ("Replica unhealthy!" ) # Triggers Ray Serve restart
342349
343350 def http_to_grpc_status (self , http_status_code : int ) -> grpc .StatusCode :
344351 """A simple function to map HTTP status codes to gRPC status codes."""
@@ -372,22 +379,27 @@ async def CreateChatCompletion(self, request):
372379 ** json_format .MessageToDict (request , preserving_proto_field_name = True )
373380 )
374381 logger .debug (f"Request: { chat } " )
375- generator = await self .openai_serving_chat .create_chat_completion (chat )
376- if isinstance (generator , ErrorResponse ):
377- status_code = self .http_to_grpc_status (generator .code )
378- raise grpc .RpcError (
379- status_code ,
380- generator .model_dump (exclude_unset = True , exclude_none = True ),
381- )
382+ try :
383+ generator = await self .openai_serving_chat .create_chat_completion (chat )
384+ if isinstance (generator , ErrorResponse ):
385+ status_code = self .http_to_grpc_status (generator .code )
386+ raise grpc .RpcError (
387+ status_code ,
388+ generator .model_dump (exclude_unset = True , exclude_none = True ),
389+ )
382390
383- assert isinstance (generator , ChatCompletionResponse )
384- response = openai_pb2 .ChatCompletionResponse () # type: ignore[attr-defined]
385- response_dict = generator .model_dump (
386- exclude_unset = True ,
387- exclude_none = True ,
388- )
389- json_format .ParseDict (response_dict , response )
390- return response
391+ assert isinstance (generator , ChatCompletionResponse )
392+ response = openai_pb2 .ChatCompletionResponse () # type: ignore[attr-defined]
393+ response_dict = generator .model_dump (
394+ exclude_unset = True ,
395+ exclude_none = True ,
396+ )
397+ json_format .ParseDict (response_dict , response )
398+ return response
399+ except AsyncEngineDeadError as e :
400+ self .healthy = False
401+ logger .info (f"vLLM Engine Dead: { e } " )
402+ raise RuntimeError ("vLLM Engine has dead and needs restarting." ) from e
391403
392404 async def CreateCompletion (self , request ):
393405 """OpenAI-compatible GRPC endpoint.
@@ -404,39 +416,44 @@ async def CreateCompletion(self, request):
404416 ** json_format .MessageToDict (request , preserving_proto_field_name = True )
405417 )
406418 logger .debug (f"Request: { completion_request } " )
407- generator = await self .openai_serving_completion .create_completion (
408- completion_request ,
409- Request ( # this Request is purely dummy, it is changed to optional in vllm's recent pull https://github.com/vllm-project/vllm/pull/12503
410- scope = {
411- "type" : "http" ,
412- "method" : "GET" ,
413- "path" : "" ,
414- "headers" : [],
415- }
416- ),
417- )
418- if isinstance (generator , ErrorResponse ):
419- status_code = self .http_to_grpc_status (generator .code )
420- raise grpc .RpcError (
421- status_code ,
422- generator .model_dump (exclude_unset = True , exclude_none = True ),
419+ try :
420+ generator = await self .openai_serving_completion .create_completion (
421+ completion_request ,
422+ Request ( # this Request is purely dummy, it is changed to optional in vllm's recent pull https://github.com/vllm-project/vllm/pull/12503
423+ scope = {
424+ "type" : "http" ,
425+ "method" : "GET" ,
426+ "path" : "" ,
427+ "headers" : [],
428+ }
429+ ),
423430 )
431+ if isinstance (generator , ErrorResponse ):
432+ status_code = self .http_to_grpc_status (generator .code )
433+ raise grpc .RpcError (
434+ status_code ,
435+ generator .model_dump (exclude_unset = True , exclude_none = True ),
436+ )
424437
425- assert isinstance (generator , CompletionResponse )
426- response = openai_pb2 .CompletionResponse () # type: ignore[attr-defined]
427- response_dict = generator .model_dump (
428- exclude = {"top_logprobs" }, # type: ignore[arg-type]
429- exclude_unset = True ,
430- exclude_none = True ,
431- )
432- for choice in response_dict ["choices" ]:
433- if "logprobs" in choice and "top_logprobs" in choice ["logprobs" ]:
434- choice ["logprobs" ].pop ("top_logprobs" )
435- if "prompt_logprobs" in choice :
436- for index , logprobs in enumerate (choice ["prompt_logprobs" ]):
437- choice ["prompt_logprobs" ][index ] = {"token_map" : logprobs or {}}
438- json_format .ParseDict (response_dict , response )
439- return response
438+ assert isinstance (generator , CompletionResponse )
439+ response = openai_pb2 .CompletionResponse () # type: ignore[attr-defined]
440+ response_dict = generator .model_dump (
441+ exclude = {"top_logprobs" }, # type: ignore[arg-type]
442+ exclude_unset = True ,
443+ exclude_none = True ,
444+ )
445+ for choice in response_dict ["choices" ]:
446+ if "logprobs" in choice and "top_logprobs" in choice ["logprobs" ]:
447+ choice ["logprobs" ].pop ("top_logprobs" )
448+ if "prompt_logprobs" in choice :
449+ for index , logprobs in enumerate (choice ["prompt_logprobs" ]):
450+ choice ["prompt_logprobs" ][index ] = {"token_map" : logprobs or {}}
451+ json_format .ParseDict (response_dict , response )
452+ return response
453+ except AsyncEngineDeadError as e :
454+ self .healthy = False
455+ logger .info (f"vLLM Engine Dead: { e } " )
456+ raise RuntimeError ("vLLM Engine has dead and needs restarting." ) from e
440457
441458
442459def parse_vllm_args (cli_args : Dict [str , str ]):
0 commit comments