Skip to content

Commit dbbdf7c

Browse files
author
yli1 user
committed
force R1 replica restart when AsyncEngineDeadError
1 parent 712bd16 commit dbbdf7c

File tree

2 files changed

+67
-50
lines changed

2 files changed

+67
-50
lines changed

matrix/app_server/deploy_utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,11 @@
102102
"deepseek-ai/DeepSeek-R1": {
103103
"name": "deepseek-r1",
104104
"tensor-parallel-size": 8,
105-
"pipeline-parallel-size": 2,
105+
"pipeline-parallel-size": 3,
106106
"enable-prefix-caching": True,
107107
"max-model-len": 32768,
108-
"gpu-memory-utilization": 0.8,
109-
"max_ongoing_requests": 50,
108+
"gpu-memory-utilization": 0.9,
109+
"max_ongoing_requests": 80,
110110
"trust-remote-code": True,
111111
},
112112
"meta-llama/Llama-4-Scout-17B-16E-Instruct": {

matrix/app_server/llm/ray_serve_vllm.py

Lines changed: 64 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from starlette.requests import Request
2323
from starlette.responses import JSONResponse, StreamingResponse
2424
from vllm.engine.arg_utils import AsyncEngineArgs
25-
from vllm.engine.async_llm_engine import AsyncLLMEngine
25+
from vllm.engine.async_llm_engine import AsyncEngineDeadError, AsyncLLMEngine
2626

2727
try:
2828
from vllm.v1.engine.async_llm import AsyncLLM
@@ -339,6 +339,13 @@ def __init__(
339339
chat_template=chat_template,
340340
use_v1_engine=use_v1_engine,
341341
)
342+
self.healthy = True
343+
344+
async def check_health(self):
345+
if self.healthy:
346+
return {"status": "healthy"}
347+
else:
348+
raise RuntimeError("Replica unhealthy!") # Triggers Ray Serve restart
342349

343350
def http_to_grpc_status(self, http_status_code: int) -> grpc.StatusCode:
344351
"""A simple function to map HTTP status codes to gRPC status codes."""
@@ -372,22 +379,27 @@ async def CreateChatCompletion(self, request):
372379
**json_format.MessageToDict(request, preserving_proto_field_name=True)
373380
)
374381
logger.debug(f"Request: {chat}")
375-
generator = await self.openai_serving_chat.create_chat_completion(chat)
376-
if isinstance(generator, ErrorResponse):
377-
status_code = self.http_to_grpc_status(generator.code)
378-
raise grpc.RpcError(
379-
status_code,
380-
generator.model_dump(exclude_unset=True, exclude_none=True),
381-
)
382+
try:
383+
generator = await self.openai_serving_chat.create_chat_completion(chat)
384+
if isinstance(generator, ErrorResponse):
385+
status_code = self.http_to_grpc_status(generator.code)
386+
raise grpc.RpcError(
387+
status_code,
388+
generator.model_dump(exclude_unset=True, exclude_none=True),
389+
)
382390

383-
assert isinstance(generator, ChatCompletionResponse)
384-
response = openai_pb2.ChatCompletionResponse() # type: ignore[attr-defined]
385-
response_dict = generator.model_dump(
386-
exclude_unset=True,
387-
exclude_none=True,
388-
)
389-
json_format.ParseDict(response_dict, response)
390-
return response
391+
assert isinstance(generator, ChatCompletionResponse)
392+
response = openai_pb2.ChatCompletionResponse() # type: ignore[attr-defined]
393+
response_dict = generator.model_dump(
394+
exclude_unset=True,
395+
exclude_none=True,
396+
)
397+
json_format.ParseDict(response_dict, response)
398+
return response
399+
except AsyncEngineDeadError as e:
400+
self.healthy = False
401+
logger.info(f"vLLM Engine Dead: {e}")
402+
raise RuntimeError("vLLM Engine has dead and needs restarting.") from e
391403

392404
async def CreateCompletion(self, request):
393405
"""OpenAI-compatible GRPC endpoint.
@@ -404,39 +416,44 @@ async def CreateCompletion(self, request):
404416
**json_format.MessageToDict(request, preserving_proto_field_name=True)
405417
)
406418
logger.debug(f"Request: {completion_request}")
407-
generator = await self.openai_serving_completion.create_completion(
408-
completion_request,
409-
Request( # this Request is purely dummy, it is changed to optional in vllm's recent pull https://github.com/vllm-project/vllm/pull/12503
410-
scope={
411-
"type": "http",
412-
"method": "GET",
413-
"path": "",
414-
"headers": [],
415-
}
416-
),
417-
)
418-
if isinstance(generator, ErrorResponse):
419-
status_code = self.http_to_grpc_status(generator.code)
420-
raise grpc.RpcError(
421-
status_code,
422-
generator.model_dump(exclude_unset=True, exclude_none=True),
419+
try:
420+
generator = await self.openai_serving_completion.create_completion(
421+
completion_request,
422+
Request( # this Request is purely dummy, it is changed to optional in vllm's recent pull https://github.com/vllm-project/vllm/pull/12503
423+
scope={
424+
"type": "http",
425+
"method": "GET",
426+
"path": "",
427+
"headers": [],
428+
}
429+
),
423430
)
431+
if isinstance(generator, ErrorResponse):
432+
status_code = self.http_to_grpc_status(generator.code)
433+
raise grpc.RpcError(
434+
status_code,
435+
generator.model_dump(exclude_unset=True, exclude_none=True),
436+
)
424437

425-
assert isinstance(generator, CompletionResponse)
426-
response = openai_pb2.CompletionResponse() # type: ignore[attr-defined]
427-
response_dict = generator.model_dump(
428-
exclude={"top_logprobs"}, # type: ignore[arg-type]
429-
exclude_unset=True,
430-
exclude_none=True,
431-
)
432-
for choice in response_dict["choices"]:
433-
if "logprobs" in choice and "top_logprobs" in choice["logprobs"]:
434-
choice["logprobs"].pop("top_logprobs")
435-
if "prompt_logprobs" in choice:
436-
for index, logprobs in enumerate(choice["prompt_logprobs"]):
437-
choice["prompt_logprobs"][index] = {"token_map": logprobs or {}}
438-
json_format.ParseDict(response_dict, response)
439-
return response
438+
assert isinstance(generator, CompletionResponse)
439+
response = openai_pb2.CompletionResponse() # type: ignore[attr-defined]
440+
response_dict = generator.model_dump(
441+
exclude={"top_logprobs"}, # type: ignore[arg-type]
442+
exclude_unset=True,
443+
exclude_none=True,
444+
)
445+
for choice in response_dict["choices"]:
446+
if "logprobs" in choice and "top_logprobs" in choice["logprobs"]:
447+
choice["logprobs"].pop("top_logprobs")
448+
if "prompt_logprobs" in choice:
449+
for index, logprobs in enumerate(choice["prompt_logprobs"]):
450+
choice["prompt_logprobs"][index] = {"token_map": logprobs or {}}
451+
json_format.ParseDict(response_dict, response)
452+
return response
453+
except AsyncEngineDeadError as e:
454+
self.healthy = False
455+
logger.info(f"vLLM Engine Dead: {e}")
456+
raise RuntimeError("vLLM Engine has dead and needs restarting.") from e
440457

441458

442459
def parse_vllm_args(cli_args: Dict[str, str]):

0 commit comments

Comments
 (0)