From b670ce5a42a4e54496c16356c094c064496e0149 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 24 Apr 2025 19:26:02 -0700 Subject: [PATCH 01/27] refactor Signed-off-by: Kourosh Hakhamaneshi --- .../_internal/serve/configs/server_models.py | 1 + .../serve/deployments/llm/llm_engine.py | 53 +++++ .../serve/deployments/llm/llm_server.py | 60 ++---- .../serve/deployments/llm/vllm/vllm_engine.py | 181 +++++++----------- .../serve/deployments/utils/batcher.py | 103 ++++++++++ 5 files changed, 241 insertions(+), 157 deletions(-) create mode 100644 python/ray/llm/_internal/serve/deployments/llm/llm_engine.py create mode 100644 python/ray/llm/_internal/serve/deployments/utils/batcher.py diff --git a/python/ray/llm/_internal/serve/configs/server_models.py b/python/ray/llm/_internal/serve/configs/server_models.py index 1a1b5520a4e78..17265b76bcb5c 100644 --- a/python/ray/llm/_internal/serve/configs/server_models.py +++ b/python/ray/llm/_internal/serve/configs/server_models.py @@ -945,3 +945,4 @@ class GenerationRequest(BaseModelExtended): prompt: Union[str, List[int], List[str]] request_id: Union[str, List[str]] sampling_params: Optional[Union[SamplingParams, List[SamplingParams]]] = None + stream: bool = False diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py new file mode 100644 index 0000000000000..d6cc7f82d3084 --- /dev/null +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py @@ -0,0 +1,53 @@ +from typing import AsyncGenerator, Optional + +from ray.llm._internal.serve.configs.server_models import Prompt, LLMRawResponse, LLMConfig, GenerationRequest, DiskMultiplexConfig + + +import abc + + +class LLMEngine(abc.ABC): + """Base class for all LLM engines""" + + def __init__(self, llm_config: LLMConfig): + pass + + @abc.abstractmethod + async def start(self): + """Start the engine""" + pass + + @abc.abstractmethod + async def prepare_request( + self, + request_id: str, + prompt: Prompt, + stream: bool, + disk_lora_model: Optional[DiskMultiplexConfig] = None, + **kwargs, + ) -> GenerationRequest: + """Prepare an EngineRequest for the engine""" + pass + + @abc.abstractmethod + async def generate(self, request: GenerationRequest) -> AsyncGenerator[LLMRawResponse, None]: + """Generate an LLMRawResponse stream""" + pass + + + async def check_health(self): + """Check the health of the engine""" + pass + + async def sleep(self): + """Puts the engine to sleep""" + pass + + async def wakeup(self): + """Wakes up the engine""" + pass + + def shutdown(self): + """Shuts down the engine""" + pass + \ No newline at end of file diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py index bbe89f25f875a..b9e8c5bacacc9 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py @@ -511,50 +511,24 @@ async def _predict( """ logger.info(f"Received streaming request {request_id}") - try: - multiplexed_model_id = serve.get_multiplexed_model_id() - - if multiplexed_model_id: - assert ( - self._llm_config.lora_config is not None - ), "Must setup lora config for multiplexed requests." - disk_lora_model = await self._disk_lora_model(multiplexed_model_id) - else: - disk_lora_model = None - - prompt_output = self._llm_config.prompt_format.generate_prompt(prompt) - - sampling_params = VLLMSamplingParams.from_prompt(prompt) - prompt_text = prompt_output.text - image_input = prompt_output.image - image = [] - if not self._llm_config.supports_vision and image_input: - raise RuntimeError( - "You provided image input while the engine is not set up to handle images. " - "Did you forget to set `input_modality` to image in yaml file?" - ) + multiplexed_model_id = serve.get_multiplexed_model_id() - if self._llm_config.supports_vision and image_input: - for _image in image_input: - image_url = _image.image_url - image.append(await self.image_retriever.get(image_url)) - - request_params = { - "prompt": prompt_text, - "request_id": request_id, - "sampling_params": sampling_params, - "disk_multiplex_config": disk_lora_model, - "serve_request_context": serve.context._serve_request_context.get(), - } - if image: - request_params["multi_modal_data"] = {"image": image} - vllm_request = VLLMGenerationRequest(**request_params) - except PydanticValidationError as e: - # Wrap the PydanticValidationError in a ValidationErrorWithPydantic - # so that it can be used in a RayActorError - # See https://github.com/ray-project/ray/issues/43401 - raise ValidationErrorWithPydantic(e) from None - async for llm_response in self.engine.generate(vllm_request, stream): + if multiplexed_model_id: + assert ( + self._llm_config.lora_config is not None + ), "Must setup lora config for multiplexed requests." + disk_lora_model = await self._disk_lora_model(multiplexed_model_id) + else: + disk_lora_model = None + + llm_request = await self.engine.prepare_request( + request_id=request_id, + prompt=prompt, + stream=stream, + disk_lora_model=disk_lora_model, + ) + + async for llm_response in self.engine.generate(llm_request): yield llm_response async def chat(self, request: ChatCompletionRequest) -> LLMChatResponse: diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 6565ff8e1c14f..1b4a65ccbb762 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -4,6 +4,7 @@ from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, TYPE_CHECKING import ray +from ray import serve from ray.util import metrics from ray.util.placement_group import PlacementGroup from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -36,7 +37,9 @@ initialize_node as initialize_node_util, ) from ray.llm._internal.serve.configs.server_models import ( - BatchedLLMRawResponse, + Prompt, + GenerationRequest, + DiskMultiplexConfig, LLMConfig, LLMRawResponse, LogProb, @@ -52,6 +55,9 @@ MAX_NUM_TOPLOGPROBS_ALLOWED, ) from ray.llm._internal.utils import try_import +from ray.llm._internal.serve.deployments.utils.batcher import LLMRawResponsesBatcher + +from ray.llm._internal.serve.deployments.llm.llm_engine import LLMEngine if TYPE_CHECKING: from vllm.config import ModelConfig, VllmConfig @@ -69,7 +75,7 @@ "Time a request spends in the queue first forward pass not included (ms).", boundaries=LONG_RANGE_LATENCY_HISTOGRAM_BUCKETS_MS, ) - + def _get_async_engine_args(llm_config: LLMConfig) -> "AsyncEngineArgs": engine_config = llm_config.get_engine_config() @@ -126,92 +132,6 @@ def _clear_current_platform_cache(): current_platform.get_device_capability.cache_clear() -class BatchLLMRawResponses: - """This class batches multiple LLMRawResponses from a generator into a - single response, at some time interval. - - Args: - generator: the async generator that this class pulls LLMRawResponses - from. - interval_ms: the interval at which this class yields the current batch. - If None, this class will batch all responses from the generator - together and yield the entire batch once. - """ - - def __init__( - self, - generator: AsyncGenerator[LLMRawResponse, None], - interval_ms: Optional[float] = MODEL_RESPONSE_BATCH_TIMEOUT_MS, - ): - self.generator = generator - self.queue: asyncio.Queue = asyncio.Queue() - - if interval_ms is None: - self.interval_s = None - else: - self.interval_s = interval_ms / 1000 - - self.done_event: asyncio.Event = asyncio.Event() - - # We are okay with this task getting cancelled (to propagate cancellations) - self.read_task = asyncio.create_task(self.read()) - - async def stream(self) -> AsyncGenerator[BatchedLLMRawResponse, None]: - """Drain from the queue every interval_ms and yield the merged results""" - try: - while True: - # Wait for the interval or until we finish, whichever is faster. - # We use an event to avoid asyncio.wait_for cancelling the real task on timeout. - try: - if self.interval_s is None: - await self.done_event.wait() - else: - await asyncio.wait_for( - self.done_event.wait(), timeout=self.interval_s - ) - except asyncio.TimeoutError: - pass - - # Get all elements from the queue - results, is_done = self.check_done_and_drain() - - # If there are results, merge and yield them - if results: - output: BatchedLLMRawResponse = BatchedLLMRawResponse.merge_stream(*results) # type: ignore - yield output - - # If the read task is done, exit the stream task - if is_done: - # Raise exception, if any - self.read_task.result() - break - finally: - # If the stream task is done, make sure to exit the read task - if not self.read_task.done(): - self.read_task.cancel() - - def check_done_and_drain(self): - results = self.drain_queue() - return results, self.read_task.done() - - async def read(self): - """Read from the generator and put into the queue in a tight loop""" - try: - async for x in self.generator: - self.queue.put_nowait(x) - finally: - self.done_event.set() - - def drain_queue(self): - """Drain all results currently in the queue""" - results = [] - try: - while True: - results.append(self.queue.get_nowait()) - except asyncio.QueueEmpty: - pass - return results - class _EngineBackgroundProcess: def __init__(self, ipc_path, engine_args, engine_config): @@ -251,7 +171,7 @@ def get_error(self): return self._error -class VLLMEngine: +class VLLMEngine(LLMEngine): def __init__( self, llm_config: LLMConfig, @@ -524,25 +444,64 @@ def _start_async_llm_engine( log_stats=not engine_args.disable_log_stats, ) + async def prepare_request( + self, + request_id: str, + prompt: Prompt, + stream: bool, + disk_lora_model: Optional[DiskMultiplexConfig] = None, + ) -> VLLMGenerationRequest: + + + prompt_output = self._llm_config.prompt_format.generate_prompt(prompt) + + sampling_params = VLLMSamplingParams.from_prompt(prompt) + prompt_text = prompt_output.text + image_input = prompt_output.image + image = [] + if not self._llm_config.supports_vision and image_input: + raise RuntimeError( + "You provided image input while the engine is not set up to handle images. " + "Did you forget to set `input_modality` to image in yaml file?" + ) + + if self._llm_config.supports_vision and image_input: + for _image in image_input: + image_url = _image.image_url + image.append(await self.image_retriever.get(image_url)) + + request_params = { + "prompt": prompt_text, + "request_id": request_id, + "sampling_params": sampling_params, + "disk_multiplex_config": disk_lora_model, + "serve_request_context": serve.context._serve_request_context.get(), + "stream": stream, + } + if image: + request_params["multi_modal_data"] = {"image": image} + + vllm_request = VLLMGenerationRequest(**request_params) + return vllm_request + async def generate( self, - vllm_engine_request: VLLMGenerationRequest, - stream: bool, + request: VLLMGenerationRequest, ) -> AsyncGenerator[LLMRawResponse, None]: - batch_interval_ms = MODEL_RESPONSE_BATCH_TIMEOUT_MS if stream else None - if vllm_engine_request.serve_request_context: + batch_interval_ms = MODEL_RESPONSE_BATCH_TIMEOUT_MS if request.stream else None + if request.serve_request_context: ray.serve.context._serve_request_context.set( - vllm_engine_request.serve_request_context + request.serve_request_context ) - response_stream = BatchLLMRawResponses( - self._generate(vllm_engine_request), + response_stream = LLMRawResponsesBatcher( + self._generate(request), interval_ms=batch_interval_ms, ) async for response in response_stream.stream(): yield response async def _generate( - self, vllm_generation_request: VLLMGenerationRequest + self, request: GenerationRequest ) -> AsyncGenerator[LLMRawResponse, None]: """Generate an LLMRawResponse stream @@ -560,20 +519,20 @@ async def _generate( """ if RAYLLM_ENABLE_REQUEST_PROMPT_LOGS: logger.info( - f"Request {vllm_generation_request.request_id} started. " - f"Prompt: {vllm_generation_request.prompt}" + f"Request {request.request_id} started. " + f"Prompt: {request.prompt}" ) # Construct a results generator from vLLM results_generator: AsyncGenerator["RequestOutput", None] = self.engine.generate( prompt=vllm.inputs.TextPrompt( - prompt=vllm_generation_request.prompt, - multi_modal_data=vllm_generation_request.multi_modal_data, + prompt=request.prompt, + multi_modal_data=request.multi_modal_data, ), sampling_params=self._parse_sampling_params( - vllm_generation_request.sampling_params + request.sampling_params ), - request_id=vllm_generation_request.request_id, - lora_request=vllm_generation_request.lora_request, # type: ignore + request_id=request.request_id, + lora_request=request.lora_request, # type: ignore ) # Loop over the results @@ -607,7 +566,7 @@ async def _generate( log_probs, log_probs_idx = self._extract_logprobs( output, log_probs_idx, - vllm_generation_request.sampling_params.top_logprobs, + request.sampling_params.top_logprobs, ) yield LLMRawResponse( generated_text=text_output, @@ -644,7 +603,7 @@ async def _generate( generated_tokens_s = all_tokens_collected / generation_time logger.info( - f"Request {vllm_generation_request.request_id} finished ({finish_reason}). " + f"Request {request.request_id} finished ({finish_reason}). " f"Total time: {total_request_time}s, " f"Queue time: {queue_time}, " f"Generation+async time: {generation_time_str}, " @@ -655,7 +614,7 @@ async def _generate( ) else: logger.warning( - f"Request {vllm_generation_request.request_id} " + f"Request {request.request_id} " "finished without any output. " f"Input tokens: {num_input_tokens}." ) @@ -669,7 +628,7 @@ async def _generate( finally: # Ensure that we cancel on the engine once we have exited the streaming # phase - await self.engine.abort(vllm_generation_request.request_id) + await self.engine.abort(request.request_id) def _get_prompt_limit(self) -> int: """Helper to get the prompt limit from scheduler config @@ -712,12 +671,6 @@ async def check_health(self): logger.exception("Healthcheck failed. The replica will be restarted") raise e from None - def stats(self) -> VLLMEngineStats: - return self._stats.to_stats() - - def shutdown(self, shutdown_pg: bool = True): - raise NotImplementedError() - @staticmethod def _collect_usage_metrics(sampling_params: VLLMSamplingParams) -> None: if sampling_params.best_of is not None: diff --git a/python/ray/llm/_internal/serve/deployments/utils/batcher.py b/python/ray/llm/_internal/serve/deployments/utils/batcher.py new file mode 100644 index 0000000000000..9b3b8e66a63a8 --- /dev/null +++ b/python/ray/llm/_internal/serve/deployments/utils/batcher.py @@ -0,0 +1,103 @@ +import asyncio +from typing import AsyncGenerator, Optional + + +from ray.llm._internal.serve.observability.logging import get_logger +from ray.llm._internal.serve.configs.server_models import ( + BatchedLLMRawResponse, + LLMRawResponse, +) + +from ray.llm._internal.serve.configs.constants import ( + MODEL_RESPONSE_BATCH_TIMEOUT_MS, +) + + +logger = get_logger(__name__) + + +class LLMRawResponsesBatcher: + """This class batches multiple LLMRawResponses from a generator into a + single response, at some time interval. + + Args: + generator: the async generator that this class pulls LLMRawResponses + from. + interval_ms: the interval at which this class yields the current batch. + If None, this class will batch all responses from the generator + together and yield the entire batch once. + """ + + def __init__( + self, + generator: AsyncGenerator[LLMRawResponse, None], + interval_ms: Optional[float] = MODEL_RESPONSE_BATCH_TIMEOUT_MS, + ): + self.generator = generator + self.queue: asyncio.Queue = asyncio.Queue() + + if interval_ms is None: + self.interval_s = None + else: + self.interval_s = interval_ms / 1000 + + self.done_event: asyncio.Event = asyncio.Event() + + # We are okay with this task getting cancelled (to propagate cancellations) + self.read_task = asyncio.create_task(self.read()) + + async def stream(self) -> AsyncGenerator[BatchedLLMRawResponse, None]: + """Drain from the queue every interval_ms and yield the merged results""" + try: + while True: + # Wait for the interval or until we finish, whichever is faster. + # We use an event to avoid asyncio.wait_for cancelling the real task on timeout. + try: + if self.interval_s is None: + await self.done_event.wait() + else: + await asyncio.wait_for( + self.done_event.wait(), timeout=self.interval_s + ) + except asyncio.TimeoutError: + pass + + # Get all elements from the queue + results, is_done = self.check_done_and_drain() + + # If there are results, merge and yield them + if results: + output: BatchedLLMRawResponse = BatchedLLMRawResponse.merge_stream(*results) # type: ignore + yield output + + # If the read task is done, exit the stream task + if is_done: + # Raise exception, if any + self.read_task.result() + break + finally: + # If the stream task is done, make sure to exit the read task + if not self.read_task.done(): + self.read_task.cancel() + + def check_done_and_drain(self): + results = self.drain_queue() + return results, self.read_task.done() + + async def read(self): + """Read from the generator and put into the queue in a tight loop""" + try: + async for x in self.generator: + self.queue.put_nowait(x) + finally: + self.done_event.set() + + def drain_queue(self): + """Drain all results currently in the queue""" + results = [] + try: + while True: + results.append(self.queue.get_nowait()) + except asyncio.QueueEmpty: + pass + return results From 28cd858c1c270263de857c4f0bc95984056f7172 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 24 Apr 2025 19:33:33 -0700 Subject: [PATCH 02/27] wip Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/_internal/serve/deployments/llm/llm_engine.py | 2 +- .../ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py index d6cc7f82d3084..b2100d7dad975 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py @@ -10,7 +10,7 @@ class LLMEngine(abc.ABC): """Base class for all LLM engines""" def __init__(self, llm_config: LLMConfig): - pass + self._llm_config = llm_config @abc.abstractmethod async def start(self): diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 1b4a65ccbb762..0cd08a759a144 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -181,6 +181,8 @@ def __init__( Args: llm_config: The llm configuration for this engine """ + super().__init__(llm_config) + if vllm is None: raise ImportError( "vLLM is not installed. Please install it with `pip install ray[llm]`." From 37d624d0b8a047b610b40af7b4abb77c933e7c68 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 24 Apr 2025 19:36:36 -0700 Subject: [PATCH 03/27] lint Signed-off-by: Kourosh Hakhamaneshi --- .../serve/deployments/llm/llm_engine.py | 38 +++++++++++-------- .../serve/deployments/llm/llm_server.py | 10 +---- .../serve/deployments/llm/vllm/vllm_engine.py | 28 +++++--------- 3 files changed, 33 insertions(+), 43 deletions(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py index b2100d7dad975..814140c5a1e7c 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py @@ -1,6 +1,12 @@ from typing import AsyncGenerator, Optional -from ray.llm._internal.serve.configs.server_models import Prompt, LLMRawResponse, LLMConfig, GenerationRequest, DiskMultiplexConfig +from ray.llm._internal.serve.configs.server_models import ( + Prompt, + LLMRawResponse, + LLMConfig, + GenerationRequest, + DiskMultiplexConfig, +) import abc @@ -8,46 +14,46 @@ class LLMEngine(abc.ABC): """Base class for all LLM engines""" - + def __init__(self, llm_config: LLMConfig): self._llm_config = llm_config - + @abc.abstractmethod async def start(self): """Start the engine""" pass - + @abc.abstractmethod async def prepare_request( - self, - request_id: str, - prompt: Prompt, - stream: bool, + self, + request_id: str, + prompt: Prompt, + stream: bool, disk_lora_model: Optional[DiskMultiplexConfig] = None, **kwargs, ) -> GenerationRequest: """Prepare an EngineRequest for the engine""" pass - + @abc.abstractmethod - async def generate(self, request: GenerationRequest) -> AsyncGenerator[LLMRawResponse, None]: + async def generate( + self, request: GenerationRequest + ) -> AsyncGenerator[LLMRawResponse, None]: """Generate an LLMRawResponse stream""" pass - - + async def check_health(self): """Check the health of the engine""" pass - + async def sleep(self): """Puts the engine to sleep""" pass - + async def wakeup(self): """Wakes up the engine""" pass - + def shutdown(self): """Shuts down the engine""" pass - \ No newline at end of file diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py index b9e8c5bacacc9..4863b0044870e 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py @@ -4,7 +4,6 @@ from typing import AsyncGenerator, Dict, Any, Optional, Type, Union # Third-party imports -from pydantic import ValidationError as PydanticValidationError from ray import serve from ray._common.utils import import_attr @@ -15,9 +14,6 @@ ENGINE_START_TIMEOUT_S, RAYLLM_VLLM_ENGINE_CLS_ENV, ) -from ray.llm._internal.serve.configs.error_handling import ( - ValidationErrorWithPydantic, -) from ray.llm._internal.serve.configs.openai_api_models import ( ChatCompletionLogProb, ChatCompletionLogProbs, @@ -52,10 +48,6 @@ LoraModelLoader, ) from ray.llm._internal.serve.deployments.llm.vllm.vllm_engine import VLLMEngine -from ray.llm._internal.serve.deployments.llm.vllm.vllm_models import ( - VLLMGenerationRequest, - VLLMSamplingParams, -) from ray.llm._internal.serve.deployments.utils.error_handling_utils import ( StreamingErrorHandler, ) @@ -527,7 +519,7 @@ async def _predict( stream=stream, disk_lora_model=disk_lora_model, ) - + async for llm_response in self.engine.generate(llm_request): yield llm_response diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 0cd08a759a144..e22a277c5971a 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -21,7 +21,6 @@ ) from ray.llm._internal.serve.deployments.llm.vllm.vllm_engine_stats import ( ArgUsage, - VLLMEngineStats, VLLMEngineStatTracker, usage_counters, ) @@ -75,7 +74,7 @@ "Time a request spends in the queue first forward pass not included (ms).", boundaries=LONG_RANGE_LATENCY_HISTOGRAM_BUCKETS_MS, ) - + def _get_async_engine_args(llm_config: LLMConfig) -> "AsyncEngineArgs": engine_config = llm_config.get_engine_config() @@ -132,7 +131,6 @@ def _clear_current_platform_cache(): current_platform.get_device_capability.cache_clear() - class _EngineBackgroundProcess: def __init__(self, ipc_path, engine_args, engine_config): from vllm.engine.multiprocessing.engine import MQLLMEngine @@ -182,7 +180,7 @@ def __init__( llm_config: The llm configuration for this engine """ super().__init__(llm_config) - + if vllm is None: raise ImportError( "vLLM is not installed. Please install it with `pip install ray[llm]`." @@ -447,13 +445,12 @@ def _start_async_llm_engine( ) async def prepare_request( - self, - request_id: str, - prompt: Prompt, - stream: bool, + self, + request_id: str, + prompt: Prompt, + stream: bool, disk_lora_model: Optional[DiskMultiplexConfig] = None, ) -> VLLMGenerationRequest: - prompt_output = self._llm_config.prompt_format.generate_prompt(prompt) @@ -485,16 +482,14 @@ async def prepare_request( vllm_request = VLLMGenerationRequest(**request_params) return vllm_request - + async def generate( self, request: VLLMGenerationRequest, ) -> AsyncGenerator[LLMRawResponse, None]: batch_interval_ms = MODEL_RESPONSE_BATCH_TIMEOUT_MS if request.stream else None if request.serve_request_context: - ray.serve.context._serve_request_context.set( - request.serve_request_context - ) + ray.serve.context._serve_request_context.set(request.serve_request_context) response_stream = LLMRawResponsesBatcher( self._generate(request), interval_ms=batch_interval_ms, @@ -521,8 +516,7 @@ async def _generate( """ if RAYLLM_ENABLE_REQUEST_PROMPT_LOGS: logger.info( - f"Request {request.request_id} started. " - f"Prompt: {request.prompt}" + f"Request {request.request_id} started. " f"Prompt: {request.prompt}" ) # Construct a results generator from vLLM results_generator: AsyncGenerator["RequestOutput", None] = self.engine.generate( @@ -530,9 +524,7 @@ async def _generate( prompt=request.prompt, multi_modal_data=request.multi_modal_data, ), - sampling_params=self._parse_sampling_params( - request.sampling_params - ), + sampling_params=self._parse_sampling_params(request.sampling_params), request_id=request.request_id, lora_request=request.lora_request, # type: ignore ) From 1799bf5ee13eb734abebe38c4edfb1e3a2d8fb8a Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Thu, 24 Apr 2025 20:26:30 -0700 Subject: [PATCH 04/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../_internal/serve/configs/prompt_formats.py | 1 + .../serve/deployments/llm/vllm/vllm_engine.py | 58 ++++++++++++------- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/python/ray/llm/_internal/serve/configs/prompt_formats.py b/python/ray/llm/_internal/serve/configs/prompt_formats.py index a3c0f14b6f474..35d372cd384a0 100644 --- a/python/ray/llm/_internal/serve/configs/prompt_formats.py +++ b/python/ray/llm/_internal/serve/configs/prompt_formats.py @@ -186,6 +186,7 @@ def generate_prompt( content = message.content conversation.append({"role": message.role, "content": content}) + breakpoint() prompt = self._processor.apply_chat_template( conversation=conversation, tokenize=False, diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index e22a277c5971a..e623c5ce6789f 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -451,35 +451,51 @@ async def prepare_request( stream: bool, disk_lora_model: Optional[DiskMultiplexConfig] = None, ) -> VLLMGenerationRequest: - - prompt_output = self._llm_config.prompt_format.generate_prompt(prompt) - - sampling_params = VLLMSamplingParams.from_prompt(prompt) - prompt_text = prompt_output.text - image_input = prompt_output.image - image = [] - if not self._llm_config.supports_vision and image_input: - raise RuntimeError( - "You provided image input while the engine is not set up to handle images. " - "Did you forget to set `input_modality` to image in yaml file?" + + parse_chat_messages_fn = vllm.entrypoints.chat_utils.parse_chat_messages + + # prompt_output = self._llm_config.prompt_format.generate_prompt(prompt) + # prompt_text = prompt_output.text + # image_input = prompt_output.image + # image = [] + # if not self._llm_config.supports_vision and image_input: + # raise RuntimeError( + # "You provided image input while the engine is not set up to handle images." + # ) + + # if self._llm_config.supports_vision and image_input: + # for _image in image_input: + # image_url = _image.image_url + # image.append(await self.image_retriever.get(image_url)) + + model_config = self.model_config + tokenizer = await self.engine.get_tokenizer() + + if isinstance(prompt, list): + conversation, mm_futures = parse_chat_messages_fn( + messages=[m.model_dump() for m in prompt.prompt], + model_config=model_config, + tokenizer=tokenizer, + content_format="openai", ) - - if self._llm_config.supports_vision and image_input: - for _image in image_input: - image_url = _image.image_url - image.append(await self.image_retriever.get(image_url)) - + mm_data = await mm_futures + else: + conversation = prompt + + prompt_output = self._llm_config.prompt_format.generate_prompt(conversation) + prompt_text = prompt_output.text + request_params = { "prompt": prompt_text, "request_id": request_id, - "sampling_params": sampling_params, + "sampling_params": VLLMSamplingParams.from_prompt(prompt), "disk_multiplex_config": disk_lora_model, "serve_request_context": serve.context._serve_request_context.get(), "stream": stream, } - if image: - request_params["multi_modal_data"] = {"image": image} - + if mm_data: + request_params["multi_modal_data"] = mm_data + vllm_request = VLLMGenerationRequest(**request_params) return vllm_request From 6594f342f9847f41cf1a470b2be2df1e59adc203 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Sun, 27 Apr 2025 13:16:36 -0700 Subject: [PATCH 05/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../_internal/serve/configs/prompt_formats.py | 15 ++-- .../serve/deployments/llm/vllm/vllm_engine.py | 68 ++++++++++++++----- 2 files changed, 62 insertions(+), 21 deletions(-) diff --git a/python/ray/llm/_internal/serve/configs/prompt_formats.py b/python/ray/llm/_internal/serve/configs/prompt_formats.py index 35d372cd384a0..4a3d71c598f99 100644 --- a/python/ray/llm/_internal/serve/configs/prompt_formats.py +++ b/python/ray/llm/_internal/serve/configs/prompt_formats.py @@ -24,7 +24,7 @@ class Text(BaseModel): - field: str = "text" + # field: str = "text" type: str = "text" text: str @@ -35,18 +35,24 @@ class Text(BaseModel): # This is to support the "content" content type in the prompt format, as opposite of # the "text" content from the above which most other model uses. class Content(BaseModel): - field: str = "text" + # field: str = "text" type: str = "text" content: str class Image(BaseModel): - field: str = "image_url" - image_url: Dict + # field: str = "image_url" + type: str = "image_url" + image_url: Union[Dict, str] @field_validator("image_url") @classmethod def check_image_url(cls, value): + # image_url can be a string as well: + # https://platform.openai.com/docs/guides/images-vision?api-mode=responses&format=url + if isinstance(value, str): + return value + if "url" not in value or not value["url"] or not isinstance(value["url"], str): raise ValueError( # TODO(xwjiang): Link to doc. @@ -186,7 +192,6 @@ def generate_prompt( content = message.content conversation.append({"role": message.role, "content": content}) - breakpoint() prompt = self._processor.apply_chat_template( conversation=conversation, tokenize=False, diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index e623c5ce6789f..9843b4811a624 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -68,6 +68,12 @@ vllm = try_import("vllm") logger = get_logger(__name__) +from vllm.entrypoints.chat_utils import ( + parse_chat_messages_futures, + resolve_chat_template_content_format, + apply_hf_chat_template +) + time_in_queue_histogram = metrics.Histogram( "vllm_engine_stats_time_in_queue_ms", @@ -197,6 +203,11 @@ def __init__( self.model_config: "ModelConfig" = None self.engine = None self.vllm_config: "VllmConfig" = None + + # Chat template content format (openai or string) + self._resolved_content_format = None + # Also need local instance of the tokenizer to manage prompt formatting. + self._tokenizer = None @staticmethod async def initialize_node(llm_config: LLMConfig) -> InitializeNodeOutput: @@ -218,11 +229,25 @@ async def start(self): logger.info("Skipping engine restart because the engine is already running") return - # Get the scaling options self.engine = await self._start_engine() self.running = True self.model_config = await self.engine.get_model_config() + self._tokenizer = await self.engine.get_tokenizer() + self._resolved_content_format = resolve_chat_template_content_format( + # Use HF to get the chat template so set it to None here. + chat_template=None, + # Default to None, change when it's needed. + # vllm Does not have a high level API to support all of this. + tools=None, + # Let vllm decide the content format. + given_format="auto", + tokenizer=self._tokenizer, + trust_remote_code=self.model_config.trust_remote_code, + ) + print(f"[DEBUG] resolved_content_format: {self._resolved_content_format}") + print(f"[DEBUG] tokenizer: {self._tokenizer}") + logger.info("Started vLLM engine.") async def _start_engine(self) -> "EngineClient": @@ -451,9 +476,9 @@ async def prepare_request( stream: bool, disk_lora_model: Optional[DiskMultiplexConfig] = None, ) -> VLLMGenerationRequest: - - parse_chat_messages_fn = vllm.entrypoints.chat_utils.parse_chat_messages - + + # parse_chat_messages_fn = vllm.entrypoints.chat_utils.parse_chat_messages_futures + # prompt_output = self._llm_config.prompt_format.generate_prompt(prompt) # prompt_text = prompt_output.text # image_input = prompt_output.image @@ -469,22 +494,32 @@ async def prepare_request( # image.append(await self.image_retriever.get(image_url)) model_config = self.model_config - tokenizer = await self.engine.get_tokenizer() - - if isinstance(prompt, list): - conversation, mm_futures = parse_chat_messages_fn( - messages=[m.model_dump() for m in prompt.prompt], + + if isinstance(prompt.prompt, list): + messages = [m.model_dump() for m in prompt.prompt] + print(f"[DEBUG] messages: {messages}") + conversation, mm_futures = parse_chat_messages_futures( + messages=messages, model_config=model_config, - tokenizer=tokenizer, - content_format="openai", + tokenizer=self._tokenizer, + content_format=self._resolved_content_format, ) + print(f"[DEBUG] conversation: {conversation}") mm_data = await mm_futures else: conversation = prompt - - prompt_output = self._llm_config.prompt_format.generate_prompt(conversation) - prompt_text = prompt_output.text - + + prompt_text = apply_hf_chat_template( + tokenizer=self._tokenizer, + conversation=conversation, + chat_template=None, + tools=None, + trust_remote_code=model_config.trust_remote_code, + tokenize=False, + ) + # prompt_output = self._llm_config.prompt_format.generate_prompt(conversation) + # prompt_text = prompt_output.text + request_params = { "prompt": prompt_text, "request_id": request_id, @@ -495,7 +530,7 @@ async def prepare_request( } if mm_data: request_params["multi_modal_data"] = mm_data - + vllm_request = VLLMGenerationRequest(**request_params) return vllm_request @@ -664,6 +699,7 @@ def _handle_input_too_long( if ( finish_reason and finish_reason == FinishReason.LENGTH + and hasattr(request_output.metrics, "first_token_time") and request_output.metrics.first_token_time is None ): # This means that the prompt was too long and we did not generate anything. From ff2bb07f1f81f316495c71ca09503e4414d44758 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Mon, 28 Apr 2025 10:55:46 -0700 Subject: [PATCH 06/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../_internal/serve/configs/prompt_formats.py | 4 +-- .../serve/deployments/llm/vllm/vllm_engine.py | 6 ++-- .../serve/deployments/llm/vllm/vllm_models.py | 6 ++-- .../multiplex/test_multiplex_deployment.py | 2 ++ .../deployments/llm/vllm/test_vllm_engine.py | 19 +++++----- .../serve/deployments/mock_vllm_engine.py | 36 ++++++++++++++++--- 6 files changed, 53 insertions(+), 20 deletions(-) diff --git a/python/ray/llm/_internal/serve/configs/prompt_formats.py b/python/ray/llm/_internal/serve/configs/prompt_formats.py index 4a3d71c598f99..4433bda9c69dd 100644 --- a/python/ray/llm/_internal/serve/configs/prompt_formats.py +++ b/python/ray/llm/_internal/serve/configs/prompt_formats.py @@ -48,11 +48,11 @@ class Image(BaseModel): @field_validator("image_url") @classmethod def check_image_url(cls, value): - # image_url can be a string as well: + # image_url can be a string as well: # https://platform.openai.com/docs/guides/images-vision?api-mode=responses&format=url if isinstance(value, str): return value - + if "url" not in value or not value["url"] or not isinstance(value["url"], str): raise ValueError( # TODO(xwjiang): Link to doc. diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 9843b4811a624..0a2303772421f 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -71,7 +71,7 @@ from vllm.entrypoints.chat_utils import ( parse_chat_messages_futures, resolve_chat_template_content_format, - apply_hf_chat_template + apply_hf_chat_template, ) @@ -203,7 +203,7 @@ def __init__( self.model_config: "ModelConfig" = None self.engine = None self.vllm_config: "VllmConfig" = None - + # Chat template content format (openai or string) self._resolved_content_format = None # Also need local instance of the tokenizer to manage prompt formatting. @@ -237,7 +237,7 @@ async def start(self): self._resolved_content_format = resolve_chat_template_content_format( # Use HF to get the chat template so set it to None here. chat_template=None, - # Default to None, change when it's needed. + # Default to None, change when it's needed. # vllm Does not have a high level API to support all of this. tools=None, # Let vllm decide the content format. diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index 4916615c87289..b68e6ace15fb7 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -1,5 +1,5 @@ import os -from typing import Any, Dict, List, Optional, TYPE_CHECKING +from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union from pydantic import ConfigDict, Field from ray import serve @@ -222,7 +222,9 @@ class VLLMSamplingParams(SamplingParams): class VLLMGenerationRequest(GenerationRequest): model_config = ConfigDict(arbitrary_types_allowed=True) - sampling_params: VLLMSamplingParams + sampling_params: Optional[ + Union[VLLMSamplingParams, List[VLLMSamplingParams]] + ] = None multi_modal_data: Optional[Dict[str, Any]] = None serve_request_context: Optional[serve.context._RequestContext] = None disk_multiplex_config: Optional[DiskMultiplexConfig] = None diff --git a/python/ray/llm/tests/serve/deployments/llm/multiplex/test_multiplex_deployment.py b/python/ray/llm/tests/serve/deployments/llm/multiplex/test_multiplex_deployment.py index c058ec9337e84..ee6114caead51 100644 --- a/python/ray/llm/tests/serve/deployments/llm/multiplex/test_multiplex_deployment.py +++ b/python/ray/llm/tests/serve/deployments/llm/multiplex/test_multiplex_deployment.py @@ -147,6 +147,7 @@ async def test_multiplex_deployment( expected_lora_out_with_serve_request_context[ "serve_request_context" ] = arg.model_dump().get("serve_request_context") + expected_lora_out_with_serve_request_context["stream"] = stream_tokens print("***arg***", arg.model_dump()) print("***exp***", expected_lora_out_with_serve_request_context) assert arg == arg.__class__(**expected_lora_out_with_serve_request_context) @@ -190,6 +191,7 @@ async def test_multiplex_deployment( "multi_modal_data": None, "serve_request_context": arg.model_dump().get("serve_request_context"), "disk_multiplex_config": None, + "stream": stream_tokens, } assert arg.model_dump() == expected_model_dump, ( "Arg model dump didn't match expected value." diff --git a/python/ray/llm/tests/serve/deployments/llm/vllm/test_vllm_engine.py b/python/ray/llm/tests/serve/deployments/llm/vllm/test_vllm_engine.py index 1e92f11d201fa..a8ed4508b11e9 100644 --- a/python/ray/llm/tests/serve/deployments/llm/vllm/test_vllm_engine.py +++ b/python/ray/llm/tests/serve/deployments/llm/vllm/test_vllm_engine.py @@ -7,8 +7,8 @@ import pytest from ray.llm._internal.serve.configs.server_models import FinishReason +from ray.llm._internal.serve.deployments.utils.batcher import LLMRawResponsesBatcher from ray.llm._internal.serve.deployments.llm.vllm.vllm_engine import ( - BatchLLMRawResponses, VLLMEngine, ) from ray.llm._internal.serve.deployments.llm.vllm.vllm_models import ( @@ -87,6 +87,7 @@ def get_fake_engine_and_request(llm_config: LLMConfig, expected_out: List[str]): request_id="req_id", sampling_params=VLLMSamplingParams(), disk_multiplex_config=None, + stream=True, ) return vllm_engine, req, engine_mock @@ -129,7 +130,7 @@ async def test_vllm_engine_error_in_caller(self, llm_config): ) with pytest.raises(RuntimeError): - async for _x in vllm_engine.generate(req, stream=True): + async for _x in vllm_engine.generate(req): raise RuntimeError() await asyncio.sleep(0.02) # wait for asyncio task scheduling @@ -144,7 +145,7 @@ async def test_vllm_engine_caller_cancellation(self, llm_config): ) async def run(): - async for x in vllm_engine.generate(req, stream=True): + async for x in vllm_engine.generate(req): print(x) task = asyncio.create_task(run()) @@ -229,7 +230,7 @@ class TestBatching: @pytest.mark.asyncio async def test_batch(self): count = 0 - batcher = BatchLLMRawResponses(fake_generator()) + batcher = LLMRawResponsesBatcher(fake_generator()) async for x in batcher.stream(): count += 1 assert x.num_generated_tokens == 100 @@ -242,7 +243,7 @@ async def test_batch(self): @pytest.mark.asyncio async def test_batch_timing(self): count = 0 - batcher = BatchLLMRawResponses(fake_generator_slow(num_batches=10)) + batcher = LLMRawResponsesBatcher(fake_generator_slow(num_batches=10)) async for _x in batcher.stream(): count += 1 @@ -258,7 +259,7 @@ async def test_batch_last_return_is_immediate(self): the last response if it returns quickly.""" count = 0 token_count = 0 - batcher = BatchLLMRawResponses(fake_generator_slow_last_return_immediate()) + batcher = LLMRawResponsesBatcher(fake_generator_slow_last_return_immediate()) last_response = None async for _x in batcher.stream(): count += 1 @@ -278,7 +279,7 @@ async def test_batch_last_return_is_immediate(self): async def test_batch_no_interval(self): """Check that the class creates only one batch if there's no interval.""" - batcher = BatchLLMRawResponses( + batcher = LLMRawResponsesBatcher( fake_generator_slow(num_batches=10), interval_ms=None ) @@ -301,7 +302,7 @@ async def generator_should_raise(): raise ValueError() count = 0 - batched = BatchLLMRawResponses( + batched = LLMRawResponsesBatcher( generator_should_raise(), interval_ms=interval_ms ) @@ -340,7 +341,7 @@ async def generator_should_raise(): if to_cancel == "inner": raise asyncio.CancelledError() - batched = BatchLLMRawResponses( + batched = LLMRawResponsesBatcher( generator_should_raise(), interval_ms=interval_ms ) diff --git a/python/ray/llm/tests/serve/deployments/mock_vllm_engine.py b/python/ray/llm/tests/serve/deployments/mock_vllm_engine.py index 9880fa4156d22..fe34a80135c58 100644 --- a/python/ray/llm/tests/serve/deployments/mock_vllm_engine.py +++ b/python/ray/llm/tests/serve/deployments/mock_vllm_engine.py @@ -2,7 +2,7 @@ import json import random from random import randint -from typing import Dict +from typing import Dict, Optional from PIL import Image from vllm.sampling_params import SamplingParams as VLLMInternalSamplingParams @@ -18,6 +18,7 @@ DiskMultiplexConfig, LLMConfig, LLMRawResponse, + Prompt, ) from ray.llm._internal.serve.deployments.llm.vllm.vllm_engine_stats import ( VLLMEngineStats, @@ -30,9 +31,10 @@ from ray.llm._internal.serve.deployments.utils.node_initialization_utils import ( InitializeNodeOutput, ) +from ray.llm._internal.serve.deployments.llm.llm_engine import LLMEngine -class MockVLLMEngine: +class MockVLLMEngine(LLMEngine): def __init__(self, llm_config: LLMConfig): """Create a vLLM Engine class @@ -72,6 +74,16 @@ async def async_range(count): yield i await asyncio.sleep(0.0) + async def prepare_request( + self, request_id: str, prompt: Prompt, stream: bool, **kwargs + ) -> VLLMGenerationRequest: + return VLLMGenerationRequest( + request_id=request_id, + prompt=prompt.prompt, + stream=stream, + sampling_params=VLLMSamplingParams.from_prompt(prompt), + ) + async def generate(self, vllm_engine_request: VLLMGenerationRequest, stream: bool): sampling_params = self._parse_sampling_params( vllm_engine_request.sampling_params @@ -241,7 +253,7 @@ async def generate(self, vllm_engine_request: VLLMGenerationRequest, stream: boo ) -class MockMultiplexEngine: +class MockMultiplexEngine(LLMEngine): def __init__(self, *args, **kwargs): self.started = False @@ -253,10 +265,26 @@ async def initialize_node(llm_config: LLMConfig) -> InitializeNodeOutput: extra_init_kwargs={}, ) + async def prepare_request( + self, + request_id: str, + prompt: Prompt, + stream: bool, + disk_lora_model: Optional[DiskMultiplexConfig] = None, + ) -> VLLMGenerationRequest: + output = VLLMGenerationRequest( + request_id=request_id, + prompt=prompt.prompt, + stream=stream, + sampling_params=VLLMSamplingParams.from_prompt(prompt), + disk_multiplex_config=disk_lora_model, + ) + return output + async def start(self): self.started = True - async def generate(self, arg, stream): + async def generate(self, arg): assert self.started, "Engine was not started" # First yield the arg yield arg From be35a113f761d29833179df6e8e5006d8cda3f8e Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Mon, 28 Apr 2025 11:06:15 -0700 Subject: [PATCH 07/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../_internal/serve/configs/prompt_formats.py | 16 ++++----- .../serve/deployments/llm/llm_engine.py | 4 +-- .../serve/deployments/llm/vllm/vllm_engine.py | 35 ++++--------------- 3 files changed, 15 insertions(+), 40 deletions(-) diff --git a/python/ray/llm/_internal/serve/configs/prompt_formats.py b/python/ray/llm/_internal/serve/configs/prompt_formats.py index 4433bda9c69dd..67a66075852d7 100644 --- a/python/ray/llm/_internal/serve/configs/prompt_formats.py +++ b/python/ray/llm/_internal/serve/configs/prompt_formats.py @@ -24,7 +24,6 @@ class Text(BaseModel): - # field: str = "text" type: str = "text" text: str @@ -35,24 +34,23 @@ class Text(BaseModel): # This is to support the "content" content type in the prompt format, as opposite of # the "text" content from the above which most other model uses. class Content(BaseModel): - # field: str = "text" type: str = "text" content: str class Image(BaseModel): - # field: str = "image_url" type: str = "image_url" - image_url: Union[Dict, str] + image_url: Dict @field_validator("image_url") @classmethod def check_image_url(cls, value): - # image_url can be a string as well: - # https://platform.openai.com/docs/guides/images-vision?api-mode=responses&format=url - if isinstance(value, str): - return value - + """Checks if the image_url is a dict with a 'url' key. + Example: + image_url = { + "url": "https://example.com/image.png" + } + """ if "url" not in value or not value["url"] or not isinstance(value["url"], str): raise ValueError( # TODO(xwjiang): Link to doc. diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py index 814140c5a1e7c..0348bac8837c4 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py @@ -32,14 +32,14 @@ async def prepare_request( disk_lora_model: Optional[DiskMultiplexConfig] = None, **kwargs, ) -> GenerationRequest: - """Prepare an EngineRequest for the engine""" + """Prepare a GenerationRequest for the engine""" pass @abc.abstractmethod async def generate( self, request: GenerationRequest ) -> AsyncGenerator[LLMRawResponse, None]: - """Generate an LLMRawResponse stream""" + """Generate an LLMRawResponse stream based on the GenerationRequest""" pass async def check_health(self): diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 0a2303772421f..6299a1e3f1a31 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -68,13 +68,6 @@ vllm = try_import("vllm") logger = get_logger(__name__) -from vllm.entrypoints.chat_utils import ( - parse_chat_messages_futures, - resolve_chat_template_content_format, - apply_hf_chat_template, -) - - time_in_queue_histogram = metrics.Histogram( "vllm_engine_stats_time_in_queue_ms", "Time a request spends in the queue first forward pass not included (ms).", @@ -224,6 +217,8 @@ async def start(self): If the engine is already running, do nothing. """ + from vllm.entrypoints.chat_utils import resolve_chat_template_content_format + if self.running: # The engine is already running! logger.info("Skipping engine restart because the engine is already running") @@ -245,8 +240,6 @@ async def start(self): tokenizer=self._tokenizer, trust_remote_code=self.model_config.trust_remote_code, ) - print(f"[DEBUG] resolved_content_format: {self._resolved_content_format}") - print(f"[DEBUG] tokenizer: {self._tokenizer}") logger.info("Started vLLM engine.") @@ -476,35 +469,21 @@ async def prepare_request( stream: bool, disk_lora_model: Optional[DiskMultiplexConfig] = None, ) -> VLLMGenerationRequest: - - # parse_chat_messages_fn = vllm.entrypoints.chat_utils.parse_chat_messages_futures - - # prompt_output = self._llm_config.prompt_format.generate_prompt(prompt) - # prompt_text = prompt_output.text - # image_input = prompt_output.image - # image = [] - # if not self._llm_config.supports_vision and image_input: - # raise RuntimeError( - # "You provided image input while the engine is not set up to handle images." - # ) - - # if self._llm_config.supports_vision and image_input: - # for _image in image_input: - # image_url = _image.image_url - # image.append(await self.image_retriever.get(image_url)) + from vllm.entrypoints.chat_utils import ( + parse_chat_messages_futures, + apply_hf_chat_template, + ) model_config = self.model_config if isinstance(prompt.prompt, list): messages = [m.model_dump() for m in prompt.prompt] - print(f"[DEBUG] messages: {messages}") conversation, mm_futures = parse_chat_messages_futures( messages=messages, model_config=model_config, tokenizer=self._tokenizer, content_format=self._resolved_content_format, ) - print(f"[DEBUG] conversation: {conversation}") mm_data = await mm_futures else: conversation = prompt @@ -517,8 +496,6 @@ async def prepare_request( trust_remote_code=model_config.trust_remote_code, tokenize=False, ) - # prompt_output = self._llm_config.prompt_format.generate_prompt(conversation) - # prompt_text = prompt_output.text request_params = { "prompt": prompt_text, From 9da0c2a4e156de06e9021ead016c032f5efeb4b4 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Mon, 28 Apr 2025 15:26:34 -0700 Subject: [PATCH 08/27] fixed tests Signed-off-by: Kourosh Hakhamaneshi --- .../_internal/serve/configs/prompt_formats.py | 1 + .../serve/deployments/llm/vllm/vllm_engine.py | 21 ++++++++++--------- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/python/ray/llm/_internal/serve/configs/prompt_formats.py b/python/ray/llm/_internal/serve/configs/prompt_formats.py index 67a66075852d7..86530aa5205aa 100644 --- a/python/ray/llm/_internal/serve/configs/prompt_formats.py +++ b/python/ray/llm/_internal/serve/configs/prompt_formats.py @@ -124,6 +124,7 @@ class EngineInput(BaseModel): image: Optional[List[ImageInput]] = None +# TODO (Kourosh): We can delete this abstraction. class AbstractPromptFormat(BaseModel): model_config = ConfigDict(extra="forbid") diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 6299a1e3f1a31..59e85f58c4a0f 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -475,6 +475,7 @@ async def prepare_request( ) model_config = self.model_config + mm_data = None if isinstance(prompt.prompt, list): messages = [m.model_dump() for m in prompt.prompt] @@ -485,17 +486,17 @@ async def prepare_request( content_format=self._resolved_content_format, ) mm_data = await mm_futures - else: - conversation = prompt - prompt_text = apply_hf_chat_template( - tokenizer=self._tokenizer, - conversation=conversation, - chat_template=None, - tools=None, - trust_remote_code=model_config.trust_remote_code, - tokenize=False, - ) + prompt_text = apply_hf_chat_template( + tokenizer=self._tokenizer, + conversation=conversation, + chat_template=None, + tools=None, + trust_remote_code=model_config.trust_remote_code, + tokenize=False, + ) + else: + prompt_text = prompt.prompt request_params = { "prompt": prompt_text, From f5b495820911a6d6016ff2823fe66752c164bacc Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Mon, 28 Apr 2025 16:35:09 -0700 Subject: [PATCH 09/27] fixed release tests Signed-off-by: Kourosh Hakhamaneshi --- .../llm/_internal/serve/deployments/llm/vllm/vllm_engine.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 59e85f58c4a0f..e2d912025b1a5 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -494,6 +494,9 @@ async def prepare_request( tools=None, trust_remote_code=model_config.trust_remote_code, tokenize=False, + # **kwargs for tokenizer.apply_chat_template + add_generation_prompt=True, + continue_final_message=False, ) else: prompt_text = prompt.prompt From 7ccd0d4e6a92de6bd4b9eb39156e782013f0b46c Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Mon, 28 Apr 2025 17:59:26 -0700 Subject: [PATCH 10/27] wip Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/tests/serve/mocks/mock_vllm_engine.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py index fe34a80135c58..b873da5a830a5 100644 --- a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py +++ b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py @@ -84,7 +84,7 @@ async def prepare_request( sampling_params=VLLMSamplingParams.from_prompt(prompt), ) - async def generate(self, vllm_engine_request: VLLMGenerationRequest, stream: bool): + async def generate(self, vllm_engine_request: VLLMGenerationRequest): sampling_params = self._parse_sampling_params( vllm_engine_request.sampling_params ) @@ -240,7 +240,7 @@ def _convert_to_json(self, vllm_engine_request: VLLMGenerationRequest) -> Dict: res.update({"has_image": has_image}) return json.dumps(res) - async def generate(self, vllm_engine_request: VLLMGenerationRequest, stream: bool): + async def generate(self, vllm_engine_request: VLLMGenerationRequest): yield LLMRawResponse( generated_text=self._convert_to_json(vllm_engine_request), num_input_tokens=0, @@ -356,7 +356,7 @@ async def generate_json(self, json_schema, max_tokens, prompt_len): yield llm_response await asyncio.sleep(generation_time) - async def generate(self, vllm_engine_request: VLLMGenerationRequest, stream: bool): + async def generate(self, vllm_engine_request: VLLMGenerationRequest): sampling_params = self._parse_sampling_params( vllm_engine_request.sampling_params ) From 5804ba56be94bb965b6b6497053f1b466996e935 Mon Sep 17 00:00:00 2001 From: kourosh hakhamaneshi <31483498+kouroshHakha@users.noreply.github.com> Date: Mon, 28 Apr 2025 18:13:52 -0700 Subject: [PATCH 11/27] Update python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py Co-authored-by: Gene Der Su Signed-off-by: kourosh hakhamaneshi <31483498+kouroshHakha@users.noreply.github.com> --- .../ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index e2d912025b1a5..4852954b5b046 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -233,7 +233,7 @@ async def start(self): # Use HF to get the chat template so set it to None here. chat_template=None, # Default to None, change when it's needed. - # vllm Does not have a high level API to support all of this. + # vLLM does not have a high level API to support all of this. tools=None, # Let vllm decide the content format. given_format="auto", From 3101e8138708cb367c22a6c931e1adc252e95768 Mon Sep 17 00:00:00 2001 From: kourosh hakhamaneshi <31483498+kouroshHakha@users.noreply.github.com> Date: Mon, 28 Apr 2025 18:14:03 -0700 Subject: [PATCH 12/27] Update python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py Co-authored-by: Gene Der Su Signed-off-by: kourosh hakhamaneshi <31483498+kouroshHakha@users.noreply.github.com> --- .../ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 4852954b5b046..bdfa640ed9b47 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -235,7 +235,7 @@ async def start(self): # Default to None, change when it's needed. # vLLM does not have a high level API to support all of this. tools=None, - # Let vllm decide the content format. + # Let vLLM decide the content format. given_format="auto", tokenizer=self._tokenizer, trust_remote_code=self.model_config.trust_remote_code, From b6eed947c7e0b1d31eaa9948f60a85aa96a9e59f Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Mon, 28 Apr 2025 18:40:17 -0700 Subject: [PATCH 13/27] removed serve context stuff Signed-off-by: Kourosh Hakhamaneshi --- .../llm/_internal/serve/deployments/llm/vllm/vllm_engine.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index e2d912025b1a5..8f06f51dbcb93 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -4,7 +4,6 @@ from typing import Any, AsyncGenerator, Dict, List, Optional, Tuple, TYPE_CHECKING import ray -from ray import serve from ray.util import metrics from ray.util.placement_group import PlacementGroup from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy @@ -506,7 +505,6 @@ async def prepare_request( "request_id": request_id, "sampling_params": VLLMSamplingParams.from_prompt(prompt), "disk_multiplex_config": disk_lora_model, - "serve_request_context": serve.context._serve_request_context.get(), "stream": stream, } if mm_data: @@ -520,8 +518,7 @@ async def generate( request: VLLMGenerationRequest, ) -> AsyncGenerator[LLMRawResponse, None]: batch_interval_ms = MODEL_RESPONSE_BATCH_TIMEOUT_MS if request.stream else None - if request.serve_request_context: - ray.serve.context._serve_request_context.set(request.serve_request_context) + response_stream = LLMRawResponsesBatcher( self._generate(request), interval_ms=batch_interval_ms, From d74febcdb702fe277d0e6c54c25a8aad573ed7ae Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 09:10:30 -0700 Subject: [PATCH 14/27] wip Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/tests/serve/conftest.py | 8 ++++---- .../ray/llm/tests/serve/mocks/mock_vllm_engine.py | 15 +++++++++++++-- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/python/ray/llm/tests/serve/conftest.py b/python/ray/llm/tests/serve/conftest.py index b7fb72da162a2..a36c9abd608c0 100644 --- a/python/ray/llm/tests/serve/conftest.py +++ b/python/ray/llm/tests/serve/conftest.py @@ -101,13 +101,13 @@ def get_rayllm_testing_model( @pytest.fixture -def testing_model(shutdown_ray_and_serve, use_mock_vllm_engine, model_pixtral_12b): +def testing_model(shutdown_ray_and_serve, use_mock_vllm_engine, model_smolvlm_256m): test_model_path = get_test_model_path("mock_vllm_model.yaml") with open(test_model_path, "r") as f: loaded_llm_config = yaml.safe_load(f) - loaded_llm_config["model_loading_config"]["model_source"] = model_pixtral_12b + loaded_llm_config["model_loading_config"]["model_source"] = model_smolvlm_256m test_model_path = write_yaml_file(loaded_llm_config) with get_rayllm_testing_model(test_model_path) as (client, model_id): @@ -116,14 +116,14 @@ def testing_model(shutdown_ray_and_serve, use_mock_vllm_engine, model_pixtral_12 @pytest.fixture def testing_model_no_accelerator( - shutdown_ray_and_serve, use_mock_vllm_engine, model_pixtral_12b + shutdown_ray_and_serve, use_mock_vllm_engine, model_smolvlm_256m ): test_model_path = get_test_model_path("mock_vllm_model_no_accelerator.yaml") with open(test_model_path, "r") as f: loaded_llm_config = yaml.safe_load(f) - loaded_llm_config["model_loading_config"]["model_source"] = model_pixtral_12b + loaded_llm_config["model_loading_config"]["model_source"] = model_smolvlm_256m test_model_path = write_yaml_file(loaded_llm_config) with get_rayllm_testing_model(test_model_path) as (client, model_id): diff --git a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py index b873da5a830a5..81faa9b640388 100644 --- a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py +++ b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py @@ -77,9 +77,14 @@ async def async_range(count): async def prepare_request( self, request_id: str, prompt: Prompt, stream: bool, **kwargs ) -> VLLMGenerationRequest: + + # Simplification: Assume prompt is a list of messages with one user message + assert isinstance(prompt.prompt, list) and len(prompt.prompt) == 1 + assert hasattr(prompt.prompt[0], "content") + prompt_text = prompt.prompt[0].content return VLLMGenerationRequest( request_id=request_id, - prompt=prompt.prompt, + prompt=prompt_text, stream=stream, sampling_params=VLLMSamplingParams.from_prompt(prompt), ) @@ -272,9 +277,15 @@ async def prepare_request( stream: bool, disk_lora_model: Optional[DiskMultiplexConfig] = None, ) -> VLLMGenerationRequest: + + # Simplification: Assume prompt is a list of messages with one user message + assert isinstance(prompt.prompt, list) and len(prompt.prompt) == 1 + assert hasattr(prompt.prompt[0], "content") + prompt_text = prompt.prompt[0].content + output = VLLMGenerationRequest( request_id=request_id, - prompt=prompt.prompt, + prompt=prompt_text, stream=stream, sampling_params=VLLMSamplingParams.from_prompt(prompt), disk_multiplex_config=disk_lora_model, From 93ee153c4410df25bdc70ef2617a5ca2b445ce91 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 12:51:08 -0700 Subject: [PATCH 15/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../serve/deployments/llm/llm_server.py | 6 ++-- .../serve/deployments/llm/vllm/vllm_models.py | 1 - python/ray/llm/tests/serve/conftest.py | 25 ++----------- .../cpu/builders/test_application_builders.py | 35 +++++++++++++------ .../model_config/llm_config.yaml | 2 -- .../ray/llm/tests/serve/mock_vllm_model.yaml | 7 +++- .../serve/mock_vllm_model_no_accelerator.yaml | 7 +++- .../llm/tests/serve/mocks/mock_vllm_engine.py | 29 +++++++++------ 8 files changed, 61 insertions(+), 51 deletions(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py index 4863b0044870e..c3cff3473a6ce 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py @@ -43,6 +43,7 @@ LLMConfig, LLMRawResponse, ) +from ray.llm._internal.serve.deployments.llm.llm_engine import LLMEngine from ray.llm._internal.serve.deployments.llm.image_retriever import ImageRetriever from ray.llm._internal.serve.deployments.llm.multiplex.lora_model_loader import ( LoraModelLoader, @@ -464,9 +465,11 @@ async def __init__( self.response_postprocessor = ResponsePostprocessor() @property - def _get_engine_class(self) -> VLLMEngine: + def _get_engine_class(self) -> Type[LLMEngine]: """Helper to load the engine class from the environment variable if existed else it will fallback to the default engine class. + + This is used for testing or patching purposes. """ engine_cls_path = os.environ.get(RAYLLM_VLLM_ENGINE_CLS_ENV) if engine_cls_path: @@ -477,7 +480,6 @@ def _get_engine_class(self) -> VLLMEngine: f"Failed to import engine class {engine_cls_path}. " f"Using the default engine class {self._engine_cls}." ) - return self._engine_cls async def _start_engine(self): diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index b68e6ace15fb7..bb2e9803e3828 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -226,7 +226,6 @@ class VLLMGenerationRequest(GenerationRequest): Union[VLLMSamplingParams, List[VLLMSamplingParams]] ] = None multi_modal_data: Optional[Dict[str, Any]] = None - serve_request_context: Optional[serve.context._RequestContext] = None disk_multiplex_config: Optional[DiskMultiplexConfig] = None @property diff --git a/python/ray/llm/tests/serve/conftest.py b/python/ray/llm/tests/serve/conftest.py index a36c9abd608c0..6ab028fca0608 100644 --- a/python/ray/llm/tests/serve/conftest.py +++ b/python/ray/llm/tests/serve/conftest.py @@ -30,13 +30,6 @@ def shutdown_ray_and_serve(): ray.shutdown() -@pytest.fixture -def use_mock_vllm_engine(monkeypatch): - monkeypatch.setenv( - RAYLLM_VLLM_ENGINE_CLS_ENV, - "ray.llm.tests.serve.mocks.mock_vllm_engine.MockVLLMEngine", - ) - yield @pytest.fixture @@ -101,30 +94,16 @@ def get_rayllm_testing_model( @pytest.fixture -def testing_model(shutdown_ray_and_serve, use_mock_vllm_engine, model_smolvlm_256m): +def testing_model(shutdown_ray_and_serve): test_model_path = get_test_model_path("mock_vllm_model.yaml") - with open(test_model_path, "r") as f: - loaded_llm_config = yaml.safe_load(f) - - loaded_llm_config["model_loading_config"]["model_source"] = model_smolvlm_256m - test_model_path = write_yaml_file(loaded_llm_config) - with get_rayllm_testing_model(test_model_path) as (client, model_id): yield client, model_id @pytest.fixture -def testing_model_no_accelerator( - shutdown_ray_and_serve, use_mock_vllm_engine, model_smolvlm_256m -): +def testing_model_no_accelerator(shutdown_ray_and_serve): test_model_path = get_test_model_path("mock_vllm_model_no_accelerator.yaml") - with open(test_model_path, "r") as f: - loaded_llm_config = yaml.safe_load(f) - - loaded_llm_config["model_loading_config"]["model_source"] = model_smolvlm_256m - test_model_path = write_yaml_file(loaded_llm_config) - with get_rayllm_testing_model(test_model_path) as (client, model_id): yield client, model_id diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index 1f5354480fe3d..abe9171327baf 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -25,12 +25,22 @@ @pytest.fixture -def get_llm_serve_args(llm_config): - yield LLMServingArgs(llm_configs=[llm_config]) +def llm_config_with_mock_engine(llm_config): + # Make sure engine is mocked. + if llm_config.runtime_env is None: + llm_config.runtime_env = {} + llm_config.runtime_env.setdefault("env_vars", {})[ + "RAYLLM_VLLM_ENGINE_CLS" + ] = "ray.llm.tests.serve.mocks.mock_vllm_engine.MockVLLMEngine" + yield llm_config + +@pytest.fixture +def get_llm_serve_args(llm_config_with_mock_engine): + yield LLMServingArgs(llm_configs=[llm_config_with_mock_engine]) @pytest.fixture() -def serve_config_separate_model_config_files(model_pixtral_12b): +def serve_config_separate_model_config_files(): with tempfile.TemporaryDirectory() as config_dir: serve_config_filename = "llm_app_separate_model_config_files.yaml" config_root = os.path.join(os.path.dirname(__file__), "test_config_files") @@ -50,7 +60,14 @@ def serve_config_separate_model_config_files(model_pixtral_12b): with open(llm_config_src, "r") as f: llm_config_yaml = yaml.safe_load(f) - llm_config_yaml["model_loading_config"]["model_id"] = model_pixtral_12b + + + # Make sure engine is mocked. + if llm_config_yaml.get("runtime_env", None) is None: + llm_config_yaml["runtime_env"] = {} + llm_config_yaml["runtime_env"]["env_vars"] = { + "RAYLLM_VLLM_ENGINE_CLS": "ray.llm.tests.serve.mocks.mock_vllm_engine.MockVLLMEngine" + } os.makedirs(os.path.dirname(llm_config_dst), exist_ok=True) with open(llm_config_dst, "w") as f: @@ -66,7 +83,7 @@ def serve_config_separate_model_config_files(model_pixtral_12b): class TestBuildOpenaiApp: def test_build_openai_app( - self, get_llm_serve_args, shutdown_ray_and_serve, use_mock_vllm_engine + self, get_llm_serve_args, shutdown_ray_and_serve ): """Test `build_openai_app` can build app and run it with Serve.""" @@ -79,8 +96,7 @@ def test_build_openai_app( def test_build_openai_app_with_config( self, serve_config_separate_model_config_files, - shutdown_ray_and_serve, - use_mock_vllm_engine, + shutdown_ray_and_serve ): """Test `build_openai_app` can be used in serve config.""" @@ -149,13 +165,12 @@ def test_router_built_with_autoscaling_configs(self): class TestBuildVllmDeployment: def test_build_llm_deployment( self, - llm_config, + llm_config_with_mock_engine, shutdown_ray_and_serve, - use_mock_vllm_engine, ): """Test `build_llm_deployment` can build a vLLM deployment.""" - app = build_llm_deployment(llm_config) + app = build_llm_deployment(llm_config_with_mock_engine) assert isinstance(app, serve.Application) serve.run(app) diff --git a/python/ray/llm/tests/serve/cpu/builders/test_config_files/model_config/llm_config.yaml b/python/ray/llm/tests/serve/cpu/builders/test_config_files/model_config/llm_config.yaml index 91421496d5b38..567b9457f296e 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_config_files/model_config/llm_config.yaml +++ b/python/ray/llm/tests/serve/cpu/builders/test_config_files/model_config/llm_config.yaml @@ -1,8 +1,6 @@ model_loading_config: model_id: model1 -accelerator_type: "L4" - deployment_config: ray_actor_options: resources: diff --git a/python/ray/llm/tests/serve/mock_vllm_model.yaml b/python/ray/llm/tests/serve/mock_vllm_model.yaml index 879fc9ec8f3d1..1e89e2fa7bdcc 100644 --- a/python/ray/llm/tests/serve/mock_vllm_model.yaml +++ b/python/ray/llm/tests/serve/mock_vllm_model.yaml @@ -1,5 +1,10 @@ model_loading_config: - model_id: VLLMFakeModel + model_id: FAKE_MODEL_UNDER_TEST + +# Overriding the engine class to only focus on testing the components around the engine +runtime_env: + env_vars: + RAYLLM_VLLM_ENGINE_CLS: "ray.llm.tests.serve.mocks.mock_vllm_engine.MockVLLMEngine" llm_engine: vLLM diff --git a/python/ray/llm/tests/serve/mock_vllm_model_no_accelerator.yaml b/python/ray/llm/tests/serve/mock_vllm_model_no_accelerator.yaml index 701fb4171a396..a54b2d597840c 100644 --- a/python/ray/llm/tests/serve/mock_vllm_model_no_accelerator.yaml +++ b/python/ray/llm/tests/serve/mock_vllm_model_no_accelerator.yaml @@ -1,5 +1,10 @@ model_loading_config: - model_id: VLLMFakeModel + model_id: FAKE_MODEL_UNDER_TEST + +# Overriding the engine class to only focus on testing the components around the engine +runtime_env: + env_vars: + RAYLLM_VLLM_ENGINE_CLS: "ray.llm.tests.serve.mocks.mock_vllm_engine.MockVLLMEngine" llm_engine: vLLM diff --git a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py index 81faa9b640388..8a3cebf4e66f4 100644 --- a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py +++ b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py @@ -77,11 +77,15 @@ async def async_range(count): async def prepare_request( self, request_id: str, prompt: Prompt, stream: bool, **kwargs ) -> VLLMGenerationRequest: - - # Simplification: Assume prompt is a list of messages with one user message - assert isinstance(prompt.prompt, list) and len(prompt.prompt) == 1 - assert hasattr(prompt.prompt[0], "content") - prompt_text = prompt.prompt[0].content + + if isinstance(prompt.prompt, list): + # Simplification: Assume prompt is a list of messages with one user message + assert len(prompt.prompt) == 1 + assert hasattr(prompt.prompt[0], "content") + prompt_text = prompt.prompt[0].content + else: + prompt_text = prompt.prompt + return VLLMGenerationRequest( request_id=request_id, prompt=prompt_text, @@ -277,12 +281,15 @@ async def prepare_request( stream: bool, disk_lora_model: Optional[DiskMultiplexConfig] = None, ) -> VLLMGenerationRequest: - - # Simplification: Assume prompt is a list of messages with one user message - assert isinstance(prompt.prompt, list) and len(prompt.prompt) == 1 - assert hasattr(prompt.prompt[0], "content") - prompt_text = prompt.prompt[0].content - + + if isinstance(prompt.prompt, list): + # Simplification: Assume prompt is a list of messages with one user message + assert len(prompt.prompt) == 1 + assert hasattr(prompt.prompt[0], "content") + prompt_text = prompt.prompt[0].content + else: + prompt_text = prompt.prompt + output = VLLMGenerationRequest( request_id=request_id, prompt=prompt_text, From a4b34f3cb6e1e82eed6e0988b0cca06f2ca404ba Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 12:51:25 -0700 Subject: [PATCH 16/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../ray/llm/_internal/serve/deployments/llm/llm_server.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py index c3cff3473a6ce..223821019084c 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py @@ -466,10 +466,10 @@ async def __init__( @property def _get_engine_class(self) -> Type[LLMEngine]: - """Helper to load the engine class from the environment variable if existed - else it will fallback to the default engine class. + """Helper to load the engine class from the environment variable. - This is used for testing or patching purposes. + This is used for testing or escape-hatch for patching purposes. + If env variable is not set, it will fallback to the default engine class. """ engine_cls_path = os.environ.get(RAYLLM_VLLM_ENGINE_CLS_ENV) if engine_cls_path: From d4d2a81346cd90ec9711b8cdd92ec26f5abad6c0 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 12:56:42 -0700 Subject: [PATCH 17/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../_internal/serve/deployments/llm/llm_engine.py | 10 +++++++++- .../_internal/serve/deployments/llm/llm_server.py | 6 +++--- .../serve/deployments/llm/vllm/vllm_engine.py | 4 ++-- .../serve/deployments/llm/vllm/vllm_models.py | 1 - python/ray/llm/tests/serve/conftest.py | 3 --- .../serve/cpu/builders/test_application_builders.py | 12 ++++-------- python/ray/llm/tests/serve/mocks/mock_vllm_engine.py | 2 +- 7 files changed, 19 insertions(+), 19 deletions(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py index 0348bac8837c4..182823366bdf1 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py @@ -44,7 +44,15 @@ async def generate( async def check_health(self): """Check the health of the engine""" - pass + return True + + ############################################################## + # Optional methods + # These methods will be implemented in the future to allow + # more granular life-cycle management of the engine. + # e.g. in usecases like RL training, we need to put the engine + # to sleep during training and wake up during rollouts. + ############################################################## async def sleep(self): """Puts the engine to sleep""" diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py index 223821019084c..1466e3e46715d 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py @@ -466,9 +466,9 @@ async def __init__( @property def _get_engine_class(self) -> Type[LLMEngine]: - """Helper to load the engine class from the environment variable. - - This is used for testing or escape-hatch for patching purposes. + """Helper to load the engine class from the environment variable. + + This is used for testing or escape-hatch for patching purposes. If env variable is not set, it will fallback to the default engine class. """ engine_cls_path = os.environ.get(RAYLLM_VLLM_ENGINE_CLS_ENV) diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index e7b04e3946f74..15dae5f6acfe9 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -467,7 +467,7 @@ async def prepare_request( prompt: Prompt, stream: bool, disk_lora_model: Optional[DiskMultiplexConfig] = None, - ) -> VLLMGenerationRequest: + ) -> GenerationRequest: from vllm.entrypoints.chat_utils import ( parse_chat_messages_futures, apply_hf_chat_template, @@ -515,7 +515,7 @@ async def prepare_request( async def generate( self, - request: VLLMGenerationRequest, + request: GenerationRequest, ) -> AsyncGenerator[LLMRawResponse, None]: batch_interval_ms = MODEL_RESPONSE_BATCH_TIMEOUT_MS if request.stream else None diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py index bb2e9803e3828..87a05e5e3dae5 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_models.py @@ -2,7 +2,6 @@ from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union from pydantic import ConfigDict, Field -from ray import serve from ray.util.placement_group import ( PlacementGroup, get_current_placement_group, diff --git a/python/ray/llm/tests/serve/conftest.py b/python/ray/llm/tests/serve/conftest.py index 6ab028fca0608..5b57e52696615 100644 --- a/python/ray/llm/tests/serve/conftest.py +++ b/python/ray/llm/tests/serve/conftest.py @@ -1,7 +1,6 @@ import ray from ray import serve import pytest -from ray.llm._internal.serve.configs.constants import RAYLLM_VLLM_ENGINE_CLS_ENV from ray.llm._internal.serve.configs.server_models import ( LLMConfig, ModelLoadingConfig, @@ -30,8 +29,6 @@ def shutdown_ray_and_serve(): ray.shutdown() - - @pytest.fixture def llm_config(model_pixtral_12b): yield LLMConfig( diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index abe9171327baf..18084fa79502c 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -34,6 +34,7 @@ def llm_config_with_mock_engine(llm_config): ] = "ray.llm.tests.serve.mocks.mock_vllm_engine.MockVLLMEngine" yield llm_config + @pytest.fixture def get_llm_serve_args(llm_config_with_mock_engine): yield LLMServingArgs(llm_configs=[llm_config_with_mock_engine]) @@ -60,8 +61,7 @@ def serve_config_separate_model_config_files(): with open(llm_config_src, "r") as f: llm_config_yaml = yaml.safe_load(f) - - + # Make sure engine is mocked. if llm_config_yaml.get("runtime_env", None) is None: llm_config_yaml["runtime_env"] = {} @@ -82,9 +82,7 @@ def serve_config_separate_model_config_files(): class TestBuildOpenaiApp: - def test_build_openai_app( - self, get_llm_serve_args, shutdown_ray_and_serve - ): + def test_build_openai_app(self, get_llm_serve_args, shutdown_ray_and_serve): """Test `build_openai_app` can build app and run it with Serve.""" app = build_openai_app( @@ -94,9 +92,7 @@ def test_build_openai_app( serve.run(app) def test_build_openai_app_with_config( - self, - serve_config_separate_model_config_files, - shutdown_ray_and_serve + self, serve_config_separate_model_config_files, shutdown_ray_and_serve ): """Test `build_openai_app` can be used in serve config.""" diff --git a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py index 8a3cebf4e66f4..26587a9d74e21 100644 --- a/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py +++ b/python/ray/llm/tests/serve/mocks/mock_vllm_engine.py @@ -85,7 +85,7 @@ async def prepare_request( prompt_text = prompt.prompt[0].content else: prompt_text = prompt.prompt - + return VLLMGenerationRequest( request_id=request_id, prompt=prompt_text, From be06a5c42c1e788f289bfc8efba4102d5a813e44 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 13:35:09 -0700 Subject: [PATCH 18/27] fixed test Signed-off-by: Kourosh Hakhamaneshi --- .../cpu/builders/test_application_builders.py | 84 +++++++++++-------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index 18084fa79502c..98a9a0d4d2bbd 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -1,5 +1,6 @@ import pytest from ray import serve +from ray.serve.schema import ApplicationStatus, DeploymentStatus from ray.llm._internal.serve.configs.server_models import ( LLMServingArgs, @@ -42,43 +43,44 @@ def get_llm_serve_args(llm_config_with_mock_engine): @pytest.fixture() def serve_config_separate_model_config_files(): - with tempfile.TemporaryDirectory() as config_dir: - serve_config_filename = "llm_app_separate_model_config_files.yaml" - config_root = os.path.join(os.path.dirname(__file__), "test_config_files") - serve_config_src = os.path.join(config_root, serve_config_filename) - serve_config_dst = os.path.join(config_dir, serve_config_filename) + # with tempfile.TemporaryDirectory() as config_dir: + config_dir = tempfile.mkdtemp() + serve_config_filename = "llm_app_separate_model_config_files.yaml" + config_root = os.path.join(os.path.dirname(__file__), "test_config_files") + serve_config_src = os.path.join(config_root, serve_config_filename) + serve_config_dst = os.path.join(config_dir, serve_config_filename) - with open(serve_config_src, "r") as f: - serve_config_yaml = yaml.safe_load(f) + with open(serve_config_src, "r") as f: + serve_config_yaml = yaml.safe_load(f) - for application in serve_config_yaml["applications"]: - llm_configs = application["args"]["llm_configs"] - tmp_llm_config_files = [] - for llm_config in llm_configs: - llm_config_src = llm_config.replace(".", config_root, 1) - llm_config_dst = llm_config.replace(".", config_dir, 1) - tmp_llm_config_files.append(llm_config_dst) + for application in serve_config_yaml["applications"]: + llm_configs = application["args"]["llm_configs"] + tmp_llm_config_files = [] + for llm_config in llm_configs: + llm_config_src = llm_config.replace(".", config_root, 1) + llm_config_dst = llm_config.replace(".", config_dir, 1) + tmp_llm_config_files.append(llm_config_dst) - with open(llm_config_src, "r") as f: - llm_config_yaml = yaml.safe_load(f) + with open(llm_config_src, "r") as f: + llm_config_yaml = yaml.safe_load(f) - # Make sure engine is mocked. - if llm_config_yaml.get("runtime_env", None) is None: - llm_config_yaml["runtime_env"] = {} - llm_config_yaml["runtime_env"]["env_vars"] = { - "RAYLLM_VLLM_ENGINE_CLS": "ray.llm.tests.serve.mocks.mock_vllm_engine.MockVLLMEngine" - } + # Make sure engine is mocked. + if llm_config_yaml.get("runtime_env", None) is None: + llm_config_yaml["runtime_env"] = {} + llm_config_yaml["runtime_env"]["env_vars"] = { + "RAYLLM_VLLM_ENGINE_CLS": "ray.llm.tests.serve.mocks.mock_vllm_engine.MockVLLMEngine" + } - os.makedirs(os.path.dirname(llm_config_dst), exist_ok=True) - with open(llm_config_dst, "w") as f: - yaml.dump(llm_config_yaml, f) + os.makedirs(os.path.dirname(llm_config_dst), exist_ok=True) + with open(llm_config_dst, "w") as f: + yaml.dump(llm_config_yaml, f) - application["args"]["llm_configs"] = tmp_llm_config_files + application["args"]["llm_configs"] = tmp_llm_config_files - with open(serve_config_dst, "w") as f: - yaml.dump(serve_config_yaml, f) + with open(serve_config_dst, "w") as f: + yaml.dump(serve_config_yaml, f) - yield serve_config_dst + yield serve_config_dst class TestBuildOpenaiApp: @@ -97,13 +99,23 @@ def test_build_openai_app_with_config( """Test `build_openai_app` can be used in serve config.""" def deployments_healthy(): - status_response = subprocess.check_output(["serve", "status"]) - serve_status = yaml.safe_load(status_response)["applications"][ - "llm-endpoint" - ] - assert len(serve_status["deployments"]) == 2 - deployment_status = serve_status["deployments"].values() - assert all([status["status"] == "HEALTHY" for status in deployment_status]) + status = serve.status() + app_status = status.applications.get("llm-endpoint") + if not app_status or app_status.status != ApplicationStatus.RUNNING: + return False + + deployments = app_status.deployments + if len(deployments) != 2: + return False + + all_healthy = all( + dep.status == DeploymentStatus.HEALTHY for dep in deployments.values() + ) + if not all_healthy: + unhealthy_deployments = {name: dep.status for name, dep in deployments.items() if dep.status != DeploymentStatus.HEALTHY} + return False + + print("[TEST] All deployments healthy.") return True p = subprocess.Popen(["serve", "run", serve_config_separate_model_config_files]) From db40d5449c024d895bf19e6b51406afa85f175bb Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 13:40:15 -0700 Subject: [PATCH 19/27] Fixed tests Signed-off-by: Kourosh Hakhamaneshi --- .../llm/multiplex/test_multiplex_deployment.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/python/ray/llm/tests/serve/cpu/deployments/llm/multiplex/test_multiplex_deployment.py b/python/ray/llm/tests/serve/cpu/deployments/llm/multiplex/test_multiplex_deployment.py index cfb1877a03603..a3c60a85e34f3 100644 --- a/python/ray/llm/tests/serve/cpu/deployments/llm/multiplex/test_multiplex_deployment.py +++ b/python/ray/llm/tests/serve/cpu/deployments/llm/multiplex/test_multiplex_deployment.py @@ -143,14 +143,11 @@ async def test_multiplex_deployment( assert arg is not None - expected_lora_out_with_serve_request_context = dict(expected_lora_out) - expected_lora_out_with_serve_request_context[ - "serve_request_context" - ] = arg.model_dump().get("serve_request_context") - expected_lora_out_with_serve_request_context["stream"] = stream_tokens + expected_lora_out_modified = dict(expected_lora_out) + expected_lora_out_modified["stream"] = stream_tokens print("***arg***", arg.model_dump()) - print("***exp***", expected_lora_out_with_serve_request_context) - assert arg == arg.__class__(**expected_lora_out_with_serve_request_context) + print("***exp***", expected_lora_out_modified) + assert arg == arg.__class__(**expected_lora_out_modified) responses = [ x @@ -189,7 +186,6 @@ async def test_multiplex_deployment( "best_of": 1, }, "multi_modal_data": None, - "serve_request_context": arg.model_dump().get("serve_request_context"), "disk_multiplex_config": None, "stream": stream_tokens, } From fd43abd9d506f87584278a79ac47ae30b51d231f Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 13:42:04 -0700 Subject: [PATCH 20/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../llm/tests/serve/cpu/builders/test_application_builders.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index 98a9a0d4d2bbd..d8c533218a597 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -112,7 +112,6 @@ def deployments_healthy(): dep.status == DeploymentStatus.HEALTHY for dep in deployments.values() ) if not all_healthy: - unhealthy_deployments = {name: dep.status for name, dep in deployments.items() if dep.status != DeploymentStatus.HEALTHY} return False print("[TEST] All deployments healthy.") From c1c4f2ba8ccefabe8d835e2da250247f3783f7b3 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 15:03:07 -0700 Subject: [PATCH 21/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../serve/cpu/builders/test_application_builders.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index d8c533218a597..821b358017da5 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -43,7 +43,6 @@ def get_llm_serve_args(llm_config_with_mock_engine): @pytest.fixture() def serve_config_separate_model_config_files(): - # with tempfile.TemporaryDirectory() as config_dir: config_dir = tempfile.mkdtemp() serve_config_filename = "llm_app_separate_model_config_files.yaml" config_root = os.path.join(os.path.dirname(__file__), "test_config_files") @@ -102,23 +101,33 @@ def deployments_healthy(): status = serve.status() app_status = status.applications.get("llm-endpoint") if not app_status or app_status.status != ApplicationStatus.RUNNING: + print( + f"[TEST] Application 'llm-endpoint' not running yet. Status: {app_status}" + ) return False deployments = app_status.deployments if len(deployments) != 2: + print(f"[TEST] Expected 2 deployments, found {len(deployments)}") return False all_healthy = all( dep.status == DeploymentStatus.HEALTHY for dep in deployments.values() ) if not all_healthy: + unhealthy_deployments = { + name: dep.status + for name, dep in deployments.items() + if dep.status != DeploymentStatus.HEALTHY + } + print(f"[TEST] Not all deployments healthy: {unhealthy_deployments}") return False print("[TEST] All deployments healthy.") return True p = subprocess.Popen(["serve", "run", serve_config_separate_model_config_files]) - wait_for_condition(deployments_healthy, timeout=30) + wait_for_condition(deployments_healthy, timeout=60, retry_interval_ms=1000) p.send_signal(signal.SIGINT) # Equivalent to ctrl-C p.wait() From 0afb5f69bd5920357fef1eb6b89485e5e99c377b Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 16:01:32 -0700 Subject: [PATCH 22/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../tests/serve/cpu/builders/test_application_builders.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index 821b358017da5..35392a59c2c63 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -1,5 +1,6 @@ import pytest from ray import serve +import ray from ray.serve.schema import ApplicationStatus, DeploymentStatus from ray.llm._internal.serve.configs.server_models import ( @@ -96,6 +97,10 @@ def test_build_openai_app_with_config( self, serve_config_separate_model_config_files, shutdown_ray_and_serve ): """Test `build_openai_app` can be used in serve config.""" + + # Initialize Ray cluster so that the serve run attaches to the same cluster. + # and so that serve.status() works. + ray.init() def deployments_healthy(): status = serve.status() From f473e274cba9b517e5b29718d7eb199cf0531a27 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 16:02:05 -0700 Subject: [PATCH 23/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../llm/tests/serve/cpu/builders/test_application_builders.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index 35392a59c2c63..da3cb032f0304 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -97,7 +97,7 @@ def test_build_openai_app_with_config( self, serve_config_separate_model_config_files, shutdown_ray_and_serve ): """Test `build_openai_app` can be used in serve config.""" - + # Initialize Ray cluster so that the serve run attaches to the same cluster. # and so that serve.status() works. ray.init() From d1e416459a985f707483eb7f09de67433f8d4610 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 17:34:39 -0700 Subject: [PATCH 24/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../cpu/builders/test_application_builders.py | 64 +++++++++++-------- 1 file changed, 38 insertions(+), 26 deletions(-) diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index da3cb032f0304..46a0867141a1f 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -102,34 +102,46 @@ def test_build_openai_app_with_config( # and so that serve.status() works. ray.init() + # def deployments_healthy(): + # status = serve.status() + # app_status = status.applications.get("llm-endpoint") + # if not app_status or app_status.status != ApplicationStatus.RUNNING: + # print( + # f"[TEST] Application 'llm-endpoint' not running yet. Status: {app_status}" + # ) + # return False + + # deployments = app_status.deployments + # if len(deployments) != 2: + # print(f"[TEST] Expected 2 deployments, found {len(deployments)}") + # return False + + # all_healthy = all( + # dep.status == DeploymentStatus.HEALTHY for dep in deployments.values() + # ) + # if not all_healthy: + # unhealthy_deployments = { + # name: dep.status + # for name, dep in deployments.items() + # if dep.status != DeploymentStatus.HEALTHY + # } + # print(f"[TEST] Not all deployments healthy: {unhealthy_deployments}") + # return False + + # print("[TEST] All deployments healthy.") + # return True + + def deployments_healthy(): - status = serve.status() - app_status = status.applications.get("llm-endpoint") - if not app_status or app_status.status != ApplicationStatus.RUNNING: - print( - f"[TEST] Application 'llm-endpoint' not running yet. Status: {app_status}" - ) - return False - - deployments = app_status.deployments - if len(deployments) != 2: - print(f"[TEST] Expected 2 deployments, found {len(deployments)}") - return False - - all_healthy = all( - dep.status == DeploymentStatus.HEALTHY for dep in deployments.values() - ) - if not all_healthy: - unhealthy_deployments = { - name: dep.status - for name, dep in deployments.items() - if dep.status != DeploymentStatus.HEALTHY - } - print(f"[TEST] Not all deployments healthy: {unhealthy_deployments}") - return False - - print("[TEST] All deployments healthy.") + status_response = subprocess.check_output(["serve", "status"]) + serve_status = yaml.safe_load(status_response)["applications"][ + "llm-endpoint" + ] + assert len(serve_status["deployments"]) == 2 + deployment_status = serve_status["deployments"].values() + assert all([status["status"] == "HEALTHY" for status in deployment_status]) return True + p = subprocess.Popen(["serve", "run", serve_config_separate_model_config_files]) wait_for_condition(deployments_healthy, timeout=60, retry_interval_ms=1000) From 89e61c56ed10b78820a939c9aa7208c24400b672 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 17:35:55 -0700 Subject: [PATCH 25/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../tests/serve/cpu/builders/test_application_builders.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index 46a0867141a1f..bb4dac6f0f5e5 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -1,7 +1,6 @@ import pytest from ray import serve import ray -from ray.serve.schema import ApplicationStatus, DeploymentStatus from ray.llm._internal.serve.configs.server_models import ( LLMServingArgs, @@ -130,8 +129,7 @@ def test_build_openai_app_with_config( # print("[TEST] All deployments healthy.") # return True - - + def deployments_healthy(): status_response = subprocess.check_output(["serve", "status"]) serve_status = yaml.safe_load(status_response)["applications"][ @@ -141,7 +139,6 @@ def deployments_healthy(): deployment_status = serve_status["deployments"].values() assert all([status["status"] == "HEALTHY" for status in deployment_status]) return True - p = subprocess.Popen(["serve", "run", serve_config_separate_model_config_files]) wait_for_condition(deployments_healthy, timeout=60, retry_interval_ms=1000) From 84f39fefbb92b98985a1eb9535ceebc9e86b08fa Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 20:22:45 -0700 Subject: [PATCH 26/27] wip Signed-off-by: Kourosh Hakhamaneshi --- .../cpu/builders/test_application_builders.py | 81 ++++++++++--------- 1 file changed, 41 insertions(+), 40 deletions(-) diff --git a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py index bb4dac6f0f5e5..0c9b997ba2b95 100644 --- a/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py +++ b/python/ray/llm/tests/serve/cpu/builders/test_application_builders.py @@ -1,6 +1,5 @@ import pytest from ray import serve -import ray from ray.llm._internal.serve.configs.server_models import ( LLMServingArgs, @@ -21,6 +20,7 @@ import tempfile import signal import sys +import re from ray._private.test_utils import wait_for_condition @@ -97,47 +97,28 @@ def test_build_openai_app_with_config( ): """Test `build_openai_app` can be used in serve config.""" - # Initialize Ray cluster so that the serve run attaches to the same cluster. - # and so that serve.status() works. - ray.init() - - # def deployments_healthy(): - # status = serve.status() - # app_status = status.applications.get("llm-endpoint") - # if not app_status or app_status.status != ApplicationStatus.RUNNING: - # print( - # f"[TEST] Application 'llm-endpoint' not running yet. Status: {app_status}" - # ) - # return False - - # deployments = app_status.deployments - # if len(deployments) != 2: - # print(f"[TEST] Expected 2 deployments, found {len(deployments)}") - # return False - - # all_healthy = all( - # dep.status == DeploymentStatus.HEALTHY for dep in deployments.values() - # ) - # if not all_healthy: - # unhealthy_deployments = { - # name: dep.status - # for name, dep in deployments.items() - # if dep.status != DeploymentStatus.HEALTHY - # } - # print(f"[TEST] Not all deployments healthy: {unhealthy_deployments}") - # return False - - # print("[TEST] All deployments healthy.") - # return True - def deployments_healthy(): status_response = subprocess.check_output(["serve", "status"]) - serve_status = yaml.safe_load(status_response)["applications"][ - "llm-endpoint" - ] - assert len(serve_status["deployments"]) == 2 - deployment_status = serve_status["deployments"].values() - assert all([status["status"] == "HEALTHY" for status in deployment_status]) + print("[TEST] Status response: ", status_response) + applications = extract_applications_from_output(status_response) + + if "llm-endpoint" not in applications: + print("[TEST] Application 'llm-endpoint' not found.") + return False + + llm_endpoint_status = applications["llm-endpoint"] + if len(llm_endpoint_status["deployments"]) != 2: + print( + f"[TEST] Expected 2 deployments, found {len(llm_endpoint_status['deployments'])}" + ) + return False + + deployment_status = llm_endpoint_status["deployments"].values() + if not all([status["status"] == "HEALTHY" for status in deployment_status]): + print(f"[TEST] Not all deployments healthy: {deployment_status}") + return False + + print("[TEST] All deployments healthy.") return True p = subprocess.Popen(["serve", "run", serve_config_separate_model_config_files]) @@ -205,5 +186,25 @@ def test_build_llm_deployment( serve.run(app) +def extract_applications_from_output(output: bytes) -> dict: + """ + Extracts the 'applications' block from mixed output and returns it as a dict. + """ + # 1. Decode bytes to string + text = output.decode("utf-8", errors="ignore") + + # 2. Regex to find the 'applications:' block and its indented content + # This matches 'applications:' and all following lines that are indented (YAML block) + match = re.search(r"(^applications:\n(?:^(?: {2,}|\t).*\n?)+)", text, re.MULTILINE) + if not match: + raise ValueError("Could not find 'applications:' block in output.") + + applications_block = match.group(1) + + # 3. Parse the YAML block + applications_dict = yaml.safe_load(applications_block) + return applications_dict["applications"] + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) From 830a4b8e961c911d4820418533d02c8a43b2cd79 Mon Sep 17 00:00:00 2001 From: Kourosh Hakhamaneshi Date: Tue, 29 Apr 2025 20:25:53 -0700 Subject: [PATCH 27/27] wip Signed-off-by: Kourosh Hakhamaneshi --- python/ray/llm/_internal/serve/deployments/llm/llm_engine.py | 2 +- python/ray/llm/_internal/serve/deployments/llm/llm_server.py | 4 ++-- .../llm/_internal/serve/deployments/llm/vllm/vllm_engine.py | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py index 182823366bdf1..556ae23342ea9 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_engine.py @@ -42,7 +42,7 @@ async def generate( """Generate an LLMRawResponse stream based on the GenerationRequest""" pass - async def check_health(self): + async def check_health(self) -> bool: """Check the health of the engine""" return True diff --git a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py index 1466e3e46715d..47aaf77047ec5 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/llm_server.py +++ b/python/ray/llm/_internal/serve/deployments/llm/llm_server.py @@ -566,8 +566,8 @@ async def completions(self, request: CompletionRequest) -> LLMCompletionsRespons model=self._llm_config.model_id, gen=gen, stream=stream ) - async def check_health(self): - """Check the health of the vllm engine.""" + async def check_health(self) -> bool: + """Check the health of the llm engine.""" return await self.engine.check_health() async def _load_model(self, lora_model_id: str) -> DiskMultiplexConfig: diff --git a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py index 15dae5f6acfe9..7dde59ba09c32 100644 --- a/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py +++ b/python/ray/llm/_internal/serve/deployments/llm/vllm/vllm_engine.py @@ -685,9 +685,9 @@ def _handle_input_too_long( len(request_output.prompt_token_ids), self._get_prompt_limit() ).exception - async def check_health(self): + async def check_health(self) -> bool: if not hasattr(self.engine, "check_health"): - return + return False try: return await asyncio.wait_for(self.engine.check_health(), timeout=15)