diff --git a/matrix/app_server/deploy_utils.py b/matrix/app_server/deploy_utils.py index 39e3972..2118380 100644 --- a/matrix/app_server/deploy_utils.py +++ b/matrix/app_server/deploy_utils.py @@ -102,11 +102,11 @@ "deepseek-ai/DeepSeek-R1": { "name": "deepseek-r1", "tensor-parallel-size": 8, - "pipeline-parallel-size": 2, + "pipeline-parallel-size": 3, "enable-prefix-caching": True, "max-model-len": 32768, - "gpu-memory-utilization": 0.8, - "max_ongoing_requests": 50, + "gpu-memory-utilization": 0.9, + "max_ongoing_requests": 80, "trust-remote-code": True, }, "meta-llama/Llama-4-Scout-17B-16E-Instruct": { diff --git a/matrix/app_server/llm/ray_serve_vllm.py b/matrix/app_server/llm/ray_serve_vllm.py index 774d538..3444194 100644 --- a/matrix/app_server/llm/ray_serve_vllm.py +++ b/matrix/app_server/llm/ray_serve_vllm.py @@ -22,7 +22,7 @@ from starlette.requests import Request from starlette.responses import JSONResponse, StreamingResponse from vllm.engine.arg_utils import AsyncEngineArgs -from vllm.engine.async_llm_engine import AsyncLLMEngine +from vllm.engine.async_llm_engine import AsyncEngineDeadError, AsyncLLMEngine try: from vllm.v1.engine.async_llm import AsyncLLM @@ -339,6 +339,13 @@ def __init__( chat_template=chat_template, use_v1_engine=use_v1_engine, ) + self.healthy = True + + async def check_health(self): + if self.healthy: + return {"status": "healthy"} + else: + raise RuntimeError("Replica unhealthy!") # Triggers Ray Serve restart def http_to_grpc_status(self, http_status_code: int) -> grpc.StatusCode: """A simple function to map HTTP status codes to gRPC status codes.""" @@ -372,22 +379,27 @@ async def CreateChatCompletion(self, request): **json_format.MessageToDict(request, preserving_proto_field_name=True) ) logger.debug(f"Request: {chat}") - generator = await self.openai_serving_chat.create_chat_completion(chat) - if isinstance(generator, ErrorResponse): - status_code = self.http_to_grpc_status(generator.code) - raise grpc.RpcError( - status_code, - generator.model_dump(exclude_unset=True, exclude_none=True), - ) + try: + generator = await self.openai_serving_chat.create_chat_completion(chat) + if isinstance(generator, ErrorResponse): + status_code = self.http_to_grpc_status(generator.code) + raise grpc.RpcError( + status_code, + generator.model_dump(exclude_unset=True, exclude_none=True), + ) - assert isinstance(generator, ChatCompletionResponse) - response = openai_pb2.ChatCompletionResponse() # type: ignore[attr-defined] - response_dict = generator.model_dump( - exclude_unset=True, - exclude_none=True, - ) - json_format.ParseDict(response_dict, response) - return response + assert isinstance(generator, ChatCompletionResponse) + response = openai_pb2.ChatCompletionResponse() # type: ignore[attr-defined] + response_dict = generator.model_dump( + exclude_unset=True, + exclude_none=True, + ) + json_format.ParseDict(response_dict, response) + return response + except AsyncEngineDeadError as e: + self.healthy = False + logger.info(f"vLLM Engine Dead: {e}") + raise RuntimeError("vLLM Engine has dead and needs restarting.") from e async def CreateCompletion(self, request): """OpenAI-compatible GRPC endpoint. @@ -404,39 +416,44 @@ async def CreateCompletion(self, request): **json_format.MessageToDict(request, preserving_proto_field_name=True) ) logger.debug(f"Request: {completion_request}") - generator = await self.openai_serving_completion.create_completion( - completion_request, - Request( # this Request is purely dummy, it is changed to optional in vllm's recent pull https://github.com/vllm-project/vllm/pull/12503 - scope={ - "type": "http", - "method": "GET", - "path": "", - "headers": [], - } - ), - ) - if isinstance(generator, ErrorResponse): - status_code = self.http_to_grpc_status(generator.code) - raise grpc.RpcError( - status_code, - generator.model_dump(exclude_unset=True, exclude_none=True), + try: + generator = await self.openai_serving_completion.create_completion( + completion_request, + Request( # this Request is purely dummy, it is changed to optional in vllm's recent pull https://github.com/vllm-project/vllm/pull/12503 + scope={ + "type": "http", + "method": "GET", + "path": "", + "headers": [], + } + ), ) + if isinstance(generator, ErrorResponse): + status_code = self.http_to_grpc_status(generator.code) + raise grpc.RpcError( + status_code, + generator.model_dump(exclude_unset=True, exclude_none=True), + ) - assert isinstance(generator, CompletionResponse) - response = openai_pb2.CompletionResponse() # type: ignore[attr-defined] - response_dict = generator.model_dump( - exclude={"top_logprobs"}, # type: ignore[arg-type] - exclude_unset=True, - exclude_none=True, - ) - for choice in response_dict["choices"]: - if "logprobs" in choice and "top_logprobs" in choice["logprobs"]: - choice["logprobs"].pop("top_logprobs") - if "prompt_logprobs" in choice: - for index, logprobs in enumerate(choice["prompt_logprobs"]): - choice["prompt_logprobs"][index] = {"token_map": logprobs or {}} - json_format.ParseDict(response_dict, response) - return response + assert isinstance(generator, CompletionResponse) + response = openai_pb2.CompletionResponse() # type: ignore[attr-defined] + response_dict = generator.model_dump( + exclude={"top_logprobs"}, # type: ignore[arg-type] + exclude_unset=True, + exclude_none=True, + ) + for choice in response_dict["choices"]: + if "logprobs" in choice and "top_logprobs" in choice["logprobs"]: + choice["logprobs"].pop("top_logprobs") + if "prompt_logprobs" in choice: + for index, logprobs in enumerate(choice["prompt_logprobs"]): + choice["prompt_logprobs"][index] = {"token_map": logprobs or {}} + json_format.ParseDict(response_dict, response) + return response + except AsyncEngineDeadError as e: + self.healthy = False + logger.info(f"vLLM Engine Dead: {e}") + raise RuntimeError("vLLM Engine has dead and needs restarting.") from e def parse_vllm_args(cli_args: Dict[str, str]):