Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions matrix/app_server/deploy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@
"deepseek-ai/DeepSeek-R1": {
"name": "deepseek-r1",
"tensor-parallel-size": 8,
"pipeline-parallel-size": 2,
"pipeline-parallel-size": 3,
"enable-prefix-caching": True,
"max-model-len": 32768,
"gpu-memory-utilization": 0.8,
"max_ongoing_requests": 50,
"gpu-memory-utilization": 0.9,
"max_ongoing_requests": 80,
"trust-remote-code": True,
},
"meta-llama/Llama-4-Scout-17B-16E-Instruct": {
Expand Down
111 changes: 64 additions & 47 deletions matrix/app_server/llm/ray_serve_vllm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from starlette.requests import Request
from starlette.responses import JSONResponse, StreamingResponse
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.engine.async_llm_engine import AsyncLLMEngine
from vllm.engine.async_llm_engine import AsyncEngineDeadError, AsyncLLMEngine

try:
from vllm.v1.engine.async_llm import AsyncLLM
Expand Down Expand Up @@ -339,6 +339,13 @@ def __init__(
chat_template=chat_template,
use_v1_engine=use_v1_engine,
)
self.healthy = True

async def check_health(self):
if self.healthy:
return {"status": "healthy"}
else:
raise RuntimeError("Replica unhealthy!") # Triggers Ray Serve restart

def http_to_grpc_status(self, http_status_code: int) -> grpc.StatusCode:
"""A simple function to map HTTP status codes to gRPC status codes."""
Expand Down Expand Up @@ -372,22 +379,27 @@ async def CreateChatCompletion(self, request):
**json_format.MessageToDict(request, preserving_proto_field_name=True)
)
logger.debug(f"Request: {chat}")
generator = await self.openai_serving_chat.create_chat_completion(chat)
if isinstance(generator, ErrorResponse):
status_code = self.http_to_grpc_status(generator.code)
raise grpc.RpcError(
status_code,
generator.model_dump(exclude_unset=True, exclude_none=True),
)
try:
generator = await self.openai_serving_chat.create_chat_completion(chat)
if isinstance(generator, ErrorResponse):
status_code = self.http_to_grpc_status(generator.code)
raise grpc.RpcError(
status_code,
generator.model_dump(exclude_unset=True, exclude_none=True),
)

assert isinstance(generator, ChatCompletionResponse)
response = openai_pb2.ChatCompletionResponse() # type: ignore[attr-defined]
response_dict = generator.model_dump(
exclude_unset=True,
exclude_none=True,
)
json_format.ParseDict(response_dict, response)
return response
assert isinstance(generator, ChatCompletionResponse)
response = openai_pb2.ChatCompletionResponse() # type: ignore[attr-defined]
response_dict = generator.model_dump(
exclude_unset=True,
exclude_none=True,
)
json_format.ParseDict(response_dict, response)
return response
except AsyncEngineDeadError as e:
self.healthy = False
logger.info(f"vLLM Engine Dead: {e}")
raise RuntimeError("vLLM Engine has dead and needs restarting.") from e

async def CreateCompletion(self, request):
"""OpenAI-compatible GRPC endpoint.
Expand All @@ -404,39 +416,44 @@ async def CreateCompletion(self, request):
**json_format.MessageToDict(request, preserving_proto_field_name=True)
)
logger.debug(f"Request: {completion_request}")
generator = await self.openai_serving_completion.create_completion(
completion_request,
Request( # this Request is purely dummy, it is changed to optional in vllm's recent pull https://github.com/vllm-project/vllm/pull/12503
scope={
"type": "http",
"method": "GET",
"path": "",
"headers": [],
}
),
)
if isinstance(generator, ErrorResponse):
status_code = self.http_to_grpc_status(generator.code)
raise grpc.RpcError(
status_code,
generator.model_dump(exclude_unset=True, exclude_none=True),
try:
generator = await self.openai_serving_completion.create_completion(
completion_request,
Request( # this Request is purely dummy, it is changed to optional in vllm's recent pull https://github.com/vllm-project/vllm/pull/12503
scope={
"type": "http",
"method": "GET",
"path": "",
"headers": [],
}
),
)
if isinstance(generator, ErrorResponse):
status_code = self.http_to_grpc_status(generator.code)
raise grpc.RpcError(
status_code,
generator.model_dump(exclude_unset=True, exclude_none=True),
)

assert isinstance(generator, CompletionResponse)
response = openai_pb2.CompletionResponse() # type: ignore[attr-defined]
response_dict = generator.model_dump(
exclude={"top_logprobs"}, # type: ignore[arg-type]
exclude_unset=True,
exclude_none=True,
)
for choice in response_dict["choices"]:
if "logprobs" in choice and "top_logprobs" in choice["logprobs"]:
choice["logprobs"].pop("top_logprobs")
if "prompt_logprobs" in choice:
for index, logprobs in enumerate(choice["prompt_logprobs"]):
choice["prompt_logprobs"][index] = {"token_map": logprobs or {}}
json_format.ParseDict(response_dict, response)
return response
assert isinstance(generator, CompletionResponse)
response = openai_pb2.CompletionResponse() # type: ignore[attr-defined]
response_dict = generator.model_dump(
exclude={"top_logprobs"}, # type: ignore[arg-type]
exclude_unset=True,
exclude_none=True,
)
for choice in response_dict["choices"]:
if "logprobs" in choice and "top_logprobs" in choice["logprobs"]:
choice["logprobs"].pop("top_logprobs")
if "prompt_logprobs" in choice:
for index, logprobs in enumerate(choice["prompt_logprobs"]):
choice["prompt_logprobs"][index] = {"token_map": logprobs or {}}
json_format.ParseDict(response_dict, response)
return response
except AsyncEngineDeadError as e:
self.healthy = False
logger.info(f"vLLM Engine Dead: {e}")
raise RuntimeError("vLLM Engine has dead and needs restarting.") from e


def parse_vllm_args(cli_args: Dict[str, str]):
Expand Down