From 70367a524e9fbe82a51cf7f770901dab9b57b5d9 Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Thu, 27 Feb 2025 04:15:09 +0000 Subject: [PATCH 01/11] introduce LocalVLLMModel --- eureka_ml_insights/models/models.py | 66 +++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index 63351ce3..c9e053fc 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -6,6 +6,7 @@ import urllib.request from abc import ABC, abstractmethod from dataclasses import dataclass +import random import anthropic import tiktoken @@ -1153,6 +1154,71 @@ def generate(self, text_prompt, query_images=None, system_message=None): def model_template_fn(self, text_prompt, system_message=None): raise NotImplementedError + + +@dataclass +class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn): + """This class is used when you have multiple vLLM servers running locally.""" + + model_name: str = None + ports: list = None + clients: list = None + max_tokens: int = 2000 + + def __post_init__(self): + self.prepare_clients() + + def prepare_clients(self): + from openai import OpenAI as OpenAIClient + import requests + + # Populate self.clients with healthy servers + self.clients = [] + potential_urls = ['http://0.0.0.0:' + port for port in self.ports] + session = requests.Session() + for url in potential_urls: + try: + session.get(url+'/health') + self.clients.append(OpenAIClient(base_url=url + '/v1', api_key = 'none')) + except: + pass + if len(self.clients) == 0: + raise Exception("No healthy servers found!") + + def _generate(self, request): + + # Similar logic as OpenAICommonRequestResponseMixIn + # except we don't just use one fixed client. + start_time = time.time() + client = random.choice(self.clients) + completion = client.chat.completions.create( + model=self.model_name, + max_tokens=self.max_tokens, + **request + ) + end_time = time.time() + raw_output = completion.model_dump() + + return { + "model_output": raw_output["choices"][0]["message"]["content"], + "response_time": end_time - start_time, + "n_tokens": raw_output["usage"]["completion_tokens"] + } + + def generate(self, text_prompt, query_images=None, system_message=None): + response_dict = {} + + if text_prompt: + # Format request for OpenAI API using create_request from OpenAIRequestResponseMixIn + request = self.create_request(text_prompt, query_images, system_message) + try: + response_dict.update(self._generate(request)) + response_dict['is_valid'] = True + except Exception as e: + logging.warning(e) + response_dict['is_valid'] = False + + return response_dict @dataclass From 540cbe9e3966d4187be2c476d22902173c21a3c5 Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Tue, 4 Mar 2025 22:21:33 +0000 Subject: [PATCH 02/11] improved functionality for vllm deployments --- eureka_ml_insights/configs/model_configs.py | 22 ++ eureka_ml_insights/models/__init__.py | 2 + eureka_ml_insights/models/models.py | 188 +++++++++++++++--- .../models/vllm_deployment_script.sh | 11 + main.py | 37 +++- 5 files changed, 236 insertions(+), 24 deletions(-) create mode 100755 eureka_ml_insights/models/vllm_deployment_script.sh diff --git a/eureka_ml_insights/configs/model_configs.py b/eureka_ml_insights/configs/model_configs.py index cb34871d..d0ec5365 100644 --- a/eureka_ml_insights/configs/model_configs.py +++ b/eureka_ml_insights/configs/model_configs.py @@ -11,6 +11,7 @@ LlamaServerlessAzureRestEndpointModel, LLaVAHuggingFaceModel, LLaVAModel, + LocalVLLMModel, Phi4HFModel, MistralServerlessAzureRestEndpointModel, RestEndpointModel, @@ -256,3 +257,24 @@ "model_name": "Mistral-large-2407", }, ) + +# Local VLLM Models +# Adapt to your local deployments, or give enough info for vllm deployment. +PHI4_LOCAL_CONFIG = ModelConfig( + LocalVLLMModel, + { + # this name must match the vllm deployment name/path + "model_name": "microsoft/phi-4", + # specify ports in case the model is already deployed + "ports": ["8002", "8003"], + }, +) +QWENVL_LOCAL_CONFIG = ModelConfig( + LocalVLLMModel, + { + # this name must match the vllm deployment name/path + "model_name": "Qwen/Qwen2.5-VL-7B-Instruct", + # certain args will get passed to the vllm serve command + "tensor_parallel_size": 2, + }, +) diff --git a/eureka_ml_insights/models/__init__.py b/eureka_ml_insights/models/__init__.py index 7ba37d88..b00de32a 100644 --- a/eureka_ml_insights/models/__init__.py +++ b/eureka_ml_insights/models/__init__.py @@ -10,6 +10,7 @@ LlamaServerlessAzureRestEndpointModel, LLaVAHuggingFaceModel, LLaVAModel, + LocalVLLMModel, MistralServerlessAzureRestEndpointModel, Phi3HFModel, Phi4HFModel, @@ -34,6 +35,7 @@ MistralServerlessAzureRestEndpointModel, LlamaServerlessAzureRestEndpointModel, LLaVAModel, + LocalVLLMModel, RestEndpointModel, TestModel, vLLMModel, diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index c9e053fc..9f26a9be 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -2,6 +2,7 @@ import json import logging +import requests import time import urllib.request from abc import ABC, abstractmethod @@ -1156,41 +1157,182 @@ def model_template_fn(self, text_prompt, system_message=None): raise NotImplementedError +class LocalVLLMDeploymentHandler: + """This class is used to handle the deployment of vLLM servers.""" + + # Chose against dataclass here so we have the option to accept kwargs + # and pass them to the vLLM deployment script. + def __init__( + self, + model_name: str = None, + num_servers: int = 1, + trust_remote_code: bool = False, + tensor_parallel_size: int = 1, + pipeline_parallel_size: int = 1, + dtype: str = "auto", + quantization: str = None, + seed: int = 0, + gpu_memory_utilization: float = 0.9, + cpu_offload_gb: float = 0, + ports: list = None, + ): + if not model_name: + raise ValueError("LocalVLLM model_name must be specified.") + self.model_name = model_name + self.num_servers = num_servers + self.trust_remote_code = trust_remote_code + self.tensor_parallel_size = tensor_parallel_size + self.pipeline_parallel_size = pipeline_parallel_size + self.dtype = dtype + self.quantization = quantization + self.seed = seed + self.gpu_memory_utilization = gpu_memory_utilization + self.cpu_offload_gb = cpu_offload_gb + + self.ports = ports + self.session = requests.Session() + self.clients = self._get_clients() + + def _get_clients(self): + '''Get clients to access vllm servers, by checking for running servers and deploying if necessary.''' + from openai import OpenAI as OpenAIClient + + # If the user passes ports, check if the servers are running and populate clients accordingly. + if self.ports: + healthy_server_urls = ['http://0.0.0.0:' + port + '/v1' for port in self.get_healthy_ports()] + if len(healthy_server_urls) > 0: + logging.info(f"Found {len(healthy_server_urls)} healthy servers.") + return [OpenAIClient(base_url=url, api_key = 'none') for url in healthy_server_urls] + + # Even if the user doesn't pass ports, we can check if there happen to be deployed servers. + # There is no guarantee that the servers are hosting the correct model. + self.ports = [str(8000 + i) for i in range(self.num_servers)] + healthy_server_urls = ['http://0.0.0.0:' + port + '/v1' for port in self.get_healthy_ports()] + if len(healthy_server_urls) == self.num_servers: + logging.info(f"Found {len(healthy_server_urls)} healthy servers.") + return [OpenAIClient(base_url=url, api_key = 'none') for url in healthy_server_urls] + + # If that didn't work, let's deploy and wait for servers to come online. + self.deploy_servers() + server_start_time = time.time() + while time.time() - server_start_time < 600: + time.sleep(10) + healthy_ports = self.get_healthy_ports() + if len(healthy_ports) == self.num_servers: + logging.info(f"All {self.num_servers} servers are online.") + healthy_server_urls = ['http://0.0.0.0:' + port + '/v1' for port in healthy_ports] + return [OpenAIClient(base_url=url, api_key = 'none') for url in healthy_server_urls] + else: + logging.info(f"Waiting for {self.num_servers - len(healthy_ports)} more servers to come online.") + + if len(self.clients) == 0: + raise RuntimeError(f"Failed to start servers.") + + def get_healthy_ports(self) -> list[str]: + """Check if servers are running.""" + + healthy_ports = [] + for port in self.ports: + try: + self.session.get('http://0.0.0.0:' + port +'/health') + healthy_ports.append(port) + except: + pass + return healthy_ports + + def deploy_servers(self): + logging.info(f"No vLLM servers are running. Starting {self.num_servers} new servers at {self.ports}.") + import os, subprocess, sys, datetime + + env = os.environ.copy() + env['NUM_SERVERS'] = str(self.num_servers) + env['CURRENT_PYTHON_EXEC'] = sys.executable + env['GPU_SKIP'] = str(self.pipeline_parallel_size * self.tensor_parallel_size) + + date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S.%f") + log_dir = os.path.join("logs", "local_vllm_deployment_logs", f"{date}") + os.makedirs(log_dir) + env['LOCAL_VLLM_LOG_DIR'] = log_dir + + command = [ + os.path.dirname(os.path.abspath(__file__)) + "/vllm_deployment_script.sh", + "--model", self.model_name, + "--tensor_parallel_size", str(self.tensor_parallel_size), + "--pipeline_parallel_size", str(self.pipeline_parallel_size), + "--dtype", self.dtype, + "--seed", str(self.seed), + "--gpu_memory_utilization", str(self.gpu_memory_utilization), + "--cpu_offload_gb", str(self.cpu_offload_gb) + ] + if self.quantization: + command.append("--quantization") + command.append(self.quantization) + if self.trust_remote_code: + command.append("--trust_remote_code") + logging.info(f"Running command: {command}") + response = subprocess.run(command, text=True, env=env) + return response + + @classmethod + def shutdown_servers(cls): + # Consider whether this is appropriate since it will probably kill all vLLM servers. + import subprocess + logging.info(f"Shutting down vLLM servers.") + command = [f'pgrep -f "vllm.entrypoints.openai.api_server --model" | xargs kill -INT'] + subprocess.run(command, shell=True) + + @dataclass class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn): """This class is used when you have multiple vLLM servers running locally.""" model_name: str = None + + # Deployment parameters + num_servers: int = 1 + trust_remote_code: bool = False + tensor_parallel_size: int = 1 + pipeline_parallel_size: int = 1 + dtype: str = "auto" + quantization: str = None + seed: int = 0 + gpu_memory_utilization: float = 0.9 + cpu_offload_gb: float = 0 + + # Deployment handler ports: list = None - clients: list = None + handler: LocalVLLMDeploymentHandler = None + + # Inference parameters + temperature: float = 0.01 + top_p: float = .95 + top_k: int = -1 max_tokens: int = 2000 def __post_init__(self): - self.prepare_clients() - - def prepare_clients(self): - from openai import OpenAI as OpenAIClient - import requests + if not self.model_name: + raise ValueError("LocalVLLM model_name must be specified.") + self.handler = LocalVLLMDeploymentHandler( + model_name=self.model_name, + num_servers=self.num_servers, + trust_remote_code=self.trust_remote_code, + pipeline_parallel_size=self.pipeline_parallel_size, + tensor_parallel_size=self.tensor_parallel_size, + dtype=self.dtype, + quantization=self.quantization, + seed=self.seed, + gpu_memory_utilization=self.gpu_memory_utilization, + cpu_offload_gb=self.cpu_offload_gb, + ports=self.ports, + ) - # Populate self.clients with healthy servers - self.clients = [] - potential_urls = ['http://0.0.0.0:' + port for port in self.ports] - session = requests.Session() - for url in potential_urls: - try: - session.get(url+'/health') - self.clients.append(OpenAIClient(base_url=url + '/v1', api_key = 'none')) - except: - pass - if len(self.clients) == 0: - raise Exception("No healthy servers found!") - def _generate(self, request): - # Similar logic as OpenAICommonRequestResponseMixIn - # except we don't just use one fixed client. + # Similar logic as OpenAICommonRequestResponseMixIn. + # If OpenAICommonRequestResponseMixIn is adapted for threadsafety, + # I think there's a way to use it even with multiple clients. start_time = time.time() - client = random.choice(self.clients) + client = random.choice(self.handler.clients) completion = client.chat.completions.create( model=self.model_name, max_tokens=self.max_tokens, @@ -1202,7 +1344,7 @@ def _generate(self, request): return { "model_output": raw_output["choices"][0]["message"]["content"], "response_time": end_time - start_time, - "n_tokens": raw_output["usage"]["completion_tokens"] + "n_output_tokens": raw_output["usage"]["completion_tokens"] } def generate(self, text_prompt, query_images=None, system_message=None): diff --git a/eureka_ml_insights/models/vllm_deployment_script.sh b/eureka_ml_insights/models/vllm_deployment_script.sh new file mode 100755 index 00000000..de1454e3 --- /dev/null +++ b/eureka_ml_insights/models/vllm_deployment_script.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -e -x + +for (( i = 0; i < $NUM_SERVERS; i++ )) do + port=$((8000 + i)) + # Here GPU_SKIP is set as tensor_parallel_size*pipeline_parallel_size + first_gpu=$((i * GPU_SKIP)) + last_gpu=$((first_gpu + GPU_SKIP - 1)) + devices=$(seq -s, $first_gpu $last_gpu) + CUDA_VISIBLE_DEVICES=${devices} "$CURRENT_PYTHON_EXEC" -m vllm.entrypoints.openai.api_server "$@" --port ${port} >> ${LOCAL_VLLM_LOG_DIR}/${port}.log 2>&1 & +done \ No newline at end of file diff --git a/main.py b/main.py index 433dda38..0d9a84db 100755 --- a/main.py +++ b/main.py @@ -21,6 +21,9 @@ parser.add_argument( "--resume_from", type=str, help="The path to the inference_result.jsonl to resume from.", default=None ) + parser.add_argument("--local_vllm", action="store_true", help="Deploy/use local vllm for inference.") + parser.add_argument("--ports", type=str, nargs="*", help="Ports where vllm model is already hosted.", default=None) + parser.add_argument("--num_servers", type=int, help="Number of servers to deploy.", default=None) init_args = {} # catch any unknown arguments @@ -38,7 +41,30 @@ logging.info(f"Unknown arguments: {unknown_args} will be sent as is to the experiment config class.") experiment_config_class = args.exp_config - if args.model_config: + + if args.local_vllm and args.model_config: + from eureka_ml_insights.configs.config import ModelConfig + from eureka_ml_insights.models import LocalVLLMModel + try: + model_config = getattr(model_configs, args.model_config) + if isinstance(model_config, ModelConfig): + model_config.init_args["ports"] = args.ports + model_config.init_args["num_servers"] = args.num_servers if args.num_servers else 1 + init_args["model_config"] = model_config + # Logic above is that certain deployment parameters like ports and num_servers + # can be variable and so we allow them to be overridden by command line args. + except: + # If there's no config, create one. + init_args["model_config"] = ModelConfig( + LocalVLLMModel, + { + "model_name": args.model_config, + "ports": args.ports, + "num_servers": args.num_servers if args.num_servers else 1 + } + ) + + elif args.model_config: try: init_args["model_config"] = getattr(model_configs, args.model_config) except AttributeError: @@ -55,3 +81,12 @@ logging.info(f"Saving experiment logs in {pipeline_config.log_dir}.") pipeline = Pipeline(pipeline_config.component_configs, pipeline_config.log_dir) pipeline.run() + + # Shut down vllm servers. + if args.local_vllm: + try: + from eureka_ml_insights.models.models import LocalVLLMDeploymentHandler + LocalVLLMDeploymentHandler.shutdown_servers() + except: + logging.warning("Failed to shut down local vllm servers.") + \ No newline at end of file From bb6f0b3a067b52c60659cdbe62fd871d522ae882 Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Wed, 5 Mar 2025 00:31:47 +0000 Subject: [PATCH 03/11] example script to deploy servers and run pipeline --- deploy_and_run.sh | 61 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100755 deploy_and_run.sh diff --git a/deploy_and_run.sh b/deploy_and_run.sh new file mode 100755 index 00000000..5e5bf429 --- /dev/null +++ b/deploy_and_run.sh @@ -0,0 +1,61 @@ +#!/bin/bash + +export PYTHONPATH="$(pwd):$PYTHONPATH" +model_name="microsoft/phi-4" +current_datetime=$(date +"%Y-%m-%d %H:%M:%S") +log_dir="logs/$(current_datetime)/deploy_and_run" + +mkdir -p $log_dir + +# vllm args +num_servers=4 +tensor_parallel_size=1 +pipeline_parallel_size=1 +gpu_skip=$((tensor_parallel_size * pipeline_parallel_size)) +base_port=8000 +VLLM_ARGS="\ + --tensor-parallel-size=${tensor_parallel_size} \ + --pipeline-parallel-size=${pipeline_parallel_size} \ + --gpu-memory-utilization=0.9 \ +" +echo "Spinning up servers..." +for (( i = 0; i < $num_servers; i++ )) do + port=$((base_port + i)) + first_gpu=$((i * gpu_skip)) + last_gpu=$((first_gpu + gpu_skip - 1)) + devices=$(seq -s, $first_gpu $last_gpu) + CUDA_VISIBLE_DEVICES=${devices} vllm serve ${model_name} "$@" --port ${port} ${VLLM_ARGS} >> $log_dir/${port}.log 2>&1 & +done + +# Health check to see when servers come online +url="http://0.0.0.0:"${base_port}"/health" + +while true; do + # Send the GET request and store the response + response=$(curl -s -o /dev/null -w "%{http_code}" "$url") + + if [ "$response" -eq 200 ]; then + echo "Servers online..." + break + else + echo "Waiting for servers to come online..." + fi + + sleep 10 +done + +sleep 10 + +# Now call eureka to initiate evals. +ports=$(seq -s ' ' $base_port $((base_port + num_servers - 1))) +EUREKA_ARGS="\ + --model_config=${model_name} \ + --exp_config="IFEval_PIPELINE" \ + --local_vllm \ + --ports ${ports} \ +" +echo "Starting evals..." +python main.py ${EUREKA_ARGS} >> $log_dir/phi4.log 2>&1 + +echo "Shutting down vllm servers..." +pgrep -f "vllm serve" | xargs kill -INT \ No newline at end of file From 5ed17e462b3f814f5dca3be91080479a36df7d69 Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Thu, 6 Mar 2025 16:56:38 +0000 Subject: [PATCH 04/11] improved localvllm logic --- deploy_and_run.sh | 61 --------------- deploy_vllm_and_run_eval.sh | 77 +++++++++++++++++++ eureka_ml_insights/models/models.py | 57 ++++++++------ .../models/vllm_deployment_script.sh | 11 --- main.py | 9 --- 5 files changed, 110 insertions(+), 105 deletions(-) delete mode 100755 deploy_and_run.sh create mode 100755 deploy_vllm_and_run_eval.sh delete mode 100755 eureka_ml_insights/models/vllm_deployment_script.sh diff --git a/deploy_and_run.sh b/deploy_and_run.sh deleted file mode 100755 index 5e5bf429..00000000 --- a/deploy_and_run.sh +++ /dev/null @@ -1,61 +0,0 @@ -#!/bin/bash - -export PYTHONPATH="$(pwd):$PYTHONPATH" -model_name="microsoft/phi-4" -current_datetime=$(date +"%Y-%m-%d %H:%M:%S") -log_dir="logs/$(current_datetime)/deploy_and_run" - -mkdir -p $log_dir - -# vllm args -num_servers=4 -tensor_parallel_size=1 -pipeline_parallel_size=1 -gpu_skip=$((tensor_parallel_size * pipeline_parallel_size)) -base_port=8000 -VLLM_ARGS="\ - --tensor-parallel-size=${tensor_parallel_size} \ - --pipeline-parallel-size=${pipeline_parallel_size} \ - --gpu-memory-utilization=0.9 \ -" -echo "Spinning up servers..." -for (( i = 0; i < $num_servers; i++ )) do - port=$((base_port + i)) - first_gpu=$((i * gpu_skip)) - last_gpu=$((first_gpu + gpu_skip - 1)) - devices=$(seq -s, $first_gpu $last_gpu) - CUDA_VISIBLE_DEVICES=${devices} vllm serve ${model_name} "$@" --port ${port} ${VLLM_ARGS} >> $log_dir/${port}.log 2>&1 & -done - -# Health check to see when servers come online -url="http://0.0.0.0:"${base_port}"/health" - -while true; do - # Send the GET request and store the response - response=$(curl -s -o /dev/null -w "%{http_code}" "$url") - - if [ "$response" -eq 200 ]; then - echo "Servers online..." - break - else - echo "Waiting for servers to come online..." - fi - - sleep 10 -done - -sleep 10 - -# Now call eureka to initiate evals. -ports=$(seq -s ' ' $base_port $((base_port + num_servers - 1))) -EUREKA_ARGS="\ - --model_config=${model_name} \ - --exp_config="IFEval_PIPELINE" \ - --local_vllm \ - --ports ${ports} \ -" -echo "Starting evals..." -python main.py ${EUREKA_ARGS} >> $log_dir/phi4.log 2>&1 - -echo "Shutting down vllm servers..." -pgrep -f "vllm serve" | xargs kill -INT \ No newline at end of file diff --git a/deploy_vllm_and_run_eval.sh b/deploy_vllm_and_run_eval.sh new file mode 100755 index 00000000..f633743f --- /dev/null +++ b/deploy_vllm_and_run_eval.sh @@ -0,0 +1,77 @@ +#!/bin/bash + +export PYTHONPATH="$(pwd):$PYTHONPATH" +model_name="microsoft/phi-4" +exp_config="IFEval_PIPELINE" +current_datetime=$(date +"%Y-%m-%d-%H:%M:%S") +log_dir="logs/deploy_vllm_and_run_eval/$current_datetime" +mkdir -p $log_dir + +# vLLM args +num_servers=4 +tensor_parallel_size=1 +pipeline_parallel_size=1 +base_port=8000 +gpus_per_port=$((tensor_parallel_size * pipeline_parallel_size)) + +# Add any additional args accepted by vLLM serve here +VLLM_ARGS="\ + --tensor-parallel-size=${tensor_parallel_size} \ + --pipeline-parallel-size=${pipeline_parallel_size} \ + --gpu-memory-utilization=0.9 \ +" + +# Start servers +echo "Spinning up servers..." +for (( i = 0; i < $num_servers; i++ )) do + port=$((base_port + i)) + first_gpu=$((i * gpus_per_port)) + last_gpu=$((first_gpu + gpus_per_port - 1)) + devices=$(seq -s, $first_gpu $last_gpu) + CUDA_VISIBLE_DEVICES=${devices} vllm serve ${model_name} "$@" --port ${port} ${VLLM_ARGS} >> $log_dir/${port}.log 2>&1 & +done + +# Wait for servers to come online +while true; do + + servers_online=0 + for (( i = 0; i < $num_servers; i++ )) do + port=$((base_port + i)) + url="http://0.0.0.0:${port}/health" + response=$(curl -s -o /dev/null -w "%{http_code}" "$url") + + if [ "$response" -eq 200 ]; then + servers_online=$((servers_online + 1)) + fi + done + + if [ $servers_online -eq $num_servers ]; then + echo "All servers are online." + break + else + echo "Waiting for $((num_servers - servers_online)) more servers to come online..." + fi + + sleep 10 +done + +# Call Eureka to initiate evals +ports=$(seq -s ' ' $base_port $((base_port + num_servers - 1))) +EUREKA_ARGS="\ + --model_config=${model_name} \ + --exp_config=${exp_config} \ + --local_vllm \ + --ports ${ports} \ +" +echo "Starting evals..." +python main.py ${EUREKA_ARGS} >> $log_dir/out.log 2>&1 + +# Shut down servers +echo "Shutting down vLLM servers..." +for (( i = 0; i < $num_servers; i++ )) do + port=$((base_port + i)) + logfile="$log_dir/${port}.log" + pid=$(grep "Started server process" $logfile | grep -o '[0-9]\+') + echo "Shutting down server on port ${port} (PID ${pid})" + kill -INT $pid +done \ No newline at end of file diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index 9f26a9be..b5188db6 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -2,12 +2,14 @@ import json import logging +import random +import re import requests import time import urllib.request from abc import ABC, abstractmethod +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -import random import anthropic import tiktoken @@ -1157,7 +1159,7 @@ def model_template_fn(self, text_prompt, system_message=None): raise NotImplementedError -class LocalVLLMDeploymentHandler: +class _LocalVLLMDeploymentHandler: """This class is used to handle the deployment of vLLM servers.""" # Chose against dataclass here so we have the option to accept kwargs @@ -1241,22 +1243,35 @@ def get_healthy_ports(self) -> list[str]: return healthy_ports def deploy_servers(self): + """Deploy vLLM servers in background threads using the specified parameters.""" + logging.info(f"No vLLM servers are running. Starting {self.num_servers} new servers at {self.ports}.") - import os, subprocess, sys, datetime - - env = os.environ.copy() - env['NUM_SERVERS'] = str(self.num_servers) - env['CURRENT_PYTHON_EXEC'] = sys.executable - env['GPU_SKIP'] = str(self.pipeline_parallel_size * self.tensor_parallel_size) + import os, datetime + gpus_per_port = self.pipeline_parallel_size * self.tensor_parallel_size date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S.%f") log_dir = os.path.join("logs", "local_vllm_deployment_logs", f"{date}") os.makedirs(log_dir) - env['LOCAL_VLLM_LOG_DIR'] = log_dir + + executor = ThreadPoolExecutor(max_workers=self.num_servers) + futures = [executor.submit(lambda index: self.deploy_server(index, gpus_per_port, log_dir), i) for i in range(self.num_servers)] + + def deploy_server(self, index: int, gpus_per_port: int, log_dir: str): + """Deploy a single vLLM server using gpus_per_port many gpus starting at index*gpus_per_port.""" + + import os, subprocess + + port = 8000 + index + first_gpu = index * gpus_per_port + last_gpu = first_gpu + gpus_per_port - 1 + devices = ",".join(str(gpu_num) for gpu_num in range(first_gpu, last_gpu + 1)) + log_file = os.path.join(log_dir, f"{port}.log") command = [ - os.path.dirname(os.path.abspath(__file__)) + "/vllm_deployment_script.sh", - "--model", self.model_name, + "CUDA_VISIBLE_DEVICES=" + devices, + "vllm serve", + self.model_name, + "--port", str(port), "--tensor_parallel_size", str(self.tensor_parallel_size), "--pipeline_parallel_size", str(self.pipeline_parallel_size), "--dtype", self.dtype, @@ -1269,17 +1284,11 @@ def deploy_servers(self): command.append(self.quantization) if self.trust_remote_code: command.append("--trust_remote_code") + #command.append(">> " + log_file + " 2>&1 &") + command = " ".join(command) logging.info(f"Running command: {command}") - response = subprocess.run(command, text=True, env=env) - return response - - @classmethod - def shutdown_servers(cls): - # Consider whether this is appropriate since it will probably kill all vLLM servers. - import subprocess - logging.info(f"Shutting down vLLM servers.") - command = [f'pgrep -f "vllm.entrypoints.openai.api_server --model" | xargs kill -INT'] - subprocess.run(command, shell=True) + with open(log_file, 'w') as log_writer: + subprocess.run(command, shell=True, stdout=log_writer, stderr=log_writer) @dataclass @@ -1301,7 +1310,7 @@ class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn): # Deployment handler ports: list = None - handler: LocalVLLMDeploymentHandler = None + handler: _LocalVLLMDeploymentHandler = None # Inference parameters temperature: float = 0.01 @@ -1312,7 +1321,7 @@ class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn): def __post_init__(self): if not self.model_name: raise ValueError("LocalVLLM model_name must be specified.") - self.handler = LocalVLLMDeploymentHandler( + self.handler = _LocalVLLMDeploymentHandler( model_name=self.model_name, num_servers=self.num_servers, trust_remote_code=self.trust_remote_code, @@ -1351,7 +1360,7 @@ def generate(self, text_prompt, query_images=None, system_message=None): response_dict = {} if text_prompt: - # Format request for OpenAI API using create_request from OpenAIRequestResponseMixIn + # Format request for OpenAI API using create_request from OpenAICommonRequestResponseMixIn request = self.create_request(text_prompt, query_images, system_message) try: response_dict.update(self._generate(request)) diff --git a/eureka_ml_insights/models/vllm_deployment_script.sh b/eureka_ml_insights/models/vllm_deployment_script.sh deleted file mode 100755 index de1454e3..00000000 --- a/eureka_ml_insights/models/vllm_deployment_script.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash -set -e -x - -for (( i = 0; i < $NUM_SERVERS; i++ )) do - port=$((8000 + i)) - # Here GPU_SKIP is set as tensor_parallel_size*pipeline_parallel_size - first_gpu=$((i * GPU_SKIP)) - last_gpu=$((first_gpu + GPU_SKIP - 1)) - devices=$(seq -s, $first_gpu $last_gpu) - CUDA_VISIBLE_DEVICES=${devices} "$CURRENT_PYTHON_EXEC" -m vllm.entrypoints.openai.api_server "$@" --port ${port} >> ${LOCAL_VLLM_LOG_DIR}/${port}.log 2>&1 & -done \ No newline at end of file diff --git a/main.py b/main.py index 0d9a84db..b604f089 100755 --- a/main.py +++ b/main.py @@ -81,12 +81,3 @@ logging.info(f"Saving experiment logs in {pipeline_config.log_dir}.") pipeline = Pipeline(pipeline_config.component_configs, pipeline_config.log_dir) pipeline.run() - - # Shut down vllm servers. - if args.local_vllm: - try: - from eureka_ml_insights.models.models import LocalVLLMDeploymentHandler - LocalVLLMDeploymentHandler.shutdown_servers() - except: - logging.warning("Failed to shut down local vllm servers.") - \ No newline at end of file From 4aa3a21050f82b64176aa699867c56bcb7f6b14c Mon Sep 17 00:00:00 2001 From: Vidhisha Balachandran Date: Thu, 6 Mar 2025 13:15:27 -0800 Subject: [PATCH 05/11] retrigger checks From 714bde5dd9d809160a4a8804d331365ce014dcce Mon Sep 17 00:00:00 2001 From: Vidhisha Balachandran Date: Thu, 6 Mar 2025 13:16:29 -0800 Subject: [PATCH 06/11] retrigger checks From f26a85edeb8754737ca913065375fb416f7a55f7 Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Wed, 12 Mar 2025 19:45:19 +0000 Subject: [PATCH 07/11] removed unused import, better example model, more explicit error --- eureka_ml_insights/configs/model_configs.py | 4 ++-- eureka_ml_insights/models/models.py | 1 - main.py | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/eureka_ml_insights/configs/model_configs.py b/eureka_ml_insights/configs/model_configs.py index d0ec5365..6a45699c 100644 --- a/eureka_ml_insights/configs/model_configs.py +++ b/eureka_ml_insights/configs/model_configs.py @@ -269,11 +269,11 @@ "ports": ["8002", "8003"], }, ) -QWENVL_LOCAL_CONFIG = ModelConfig( +QWQ32B_LOCAL_CONFIG = ModelConfig( LocalVLLMModel, { # this name must match the vllm deployment name/path - "model_name": "Qwen/Qwen2.5-VL-7B-Instruct", + "model_name": "Qwen/QwQ-32B", # certain args will get passed to the vllm serve command "tensor_parallel_size": 2, }, diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index b5188db6..293479b5 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -3,7 +3,6 @@ import json import logging import random -import re import requests import time import urllib.request diff --git a/main.py b/main.py index b604f089..d495b0f0 100755 --- a/main.py +++ b/main.py @@ -53,7 +53,7 @@ init_args["model_config"] = model_config # Logic above is that certain deployment parameters like ports and num_servers # can be variable and so we allow them to be overridden by command line args. - except: + except AttributeError: # If there's no config, create one. init_args["model_config"] = ModelConfig( LocalVLLMModel, From 839580ce8b03173892b3ab6c6041917ecc7edcab Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Thu, 13 Mar 2025 20:59:21 +0000 Subject: [PATCH 08/11] added lock/dict to prevent multiple deployments of same model_name --- eureka_ml_insights/models/models.py | 46 ++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index 9683a8d3..b822dcc3 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -4,6 +4,7 @@ import logging import random import requests +import threading import time import urllib.request from abc import ABC, abstractmethod @@ -1343,10 +1344,19 @@ def deploy_server(self, index: int, gpus_per_port: int, log_dir: str): with open(log_file, 'w') as log_writer: subprocess.run(command, shell=True, stdout=log_writer, stderr=log_writer) + +local_vllm_model_lock = threading.Lock() +local_vllm_deployment_handlers : dict[str, _LocalVLLMDeploymentHandler] = {} + @dataclass class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn): - """This class is used when you have multiple vLLM servers running locally.""" + """This class is used for vLLM servers running locally. + + In case the servers are already deployed, specify the + model_name and the ports at which the servers are hosted. + Otherwise instantiating will initiate a deployment with + any deployment parameters specified.""" model_name: str = None @@ -1374,19 +1384,27 @@ class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn): def __post_init__(self): if not self.model_name: raise ValueError("LocalVLLM model_name must be specified.") - self.handler = _LocalVLLMDeploymentHandler( - model_name=self.model_name, - num_servers=self.num_servers, - trust_remote_code=self.trust_remote_code, - pipeline_parallel_size=self.pipeline_parallel_size, - tensor_parallel_size=self.tensor_parallel_size, - dtype=self.dtype, - quantization=self.quantization, - seed=self.seed, - gpu_memory_utilization=self.gpu_memory_utilization, - cpu_offload_gb=self.cpu_offload_gb, - ports=self.ports, - ) + self.handler = self._get_local_vllm_deployment_handler() + + def _get_local_vllm_deployment_handler(self): + if self.model_name not in local_vllm_deployment_handlers: + with local_vllm_model_lock: + if self.model_name not in local_vllm_deployment_handlers: + local_vllm_deployment_handlers['self.model_name'] = _LocalVLLMDeploymentHandler( + model_name=self.model_name, + num_servers=self.num_servers, + trust_remote_code=self.trust_remote_code, + pipeline_parallel_size=self.pipeline_parallel_size, + tensor_parallel_size=self.tensor_parallel_size, + dtype=self.dtype, + quantization=self.quantization, + seed=self.seed, + gpu_memory_utilization=self.gpu_memory_utilization, + cpu_offload_gb=self.cpu_offload_gb, + ports=self.ports, + ) + + return local_vllm_deployment_handlers['self.model_name'] def _generate(self, request): From efa8523e248ef8e9e4a41dfbeb85d86008173a6f Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Fri, 14 Mar 2025 21:41:37 +0000 Subject: [PATCH 09/11] bad typo => useless lock --- eureka_ml_insights/models/models.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index b822dcc3..6bd0f4b3 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -1390,7 +1390,7 @@ def _get_local_vllm_deployment_handler(self): if self.model_name not in local_vllm_deployment_handlers: with local_vllm_model_lock: if self.model_name not in local_vllm_deployment_handlers: - local_vllm_deployment_handlers['self.model_name'] = _LocalVLLMDeploymentHandler( + local_vllm_deployment_handlers[self.model_name] = _LocalVLLMDeploymentHandler( model_name=self.model_name, num_servers=self.num_servers, trust_remote_code=self.trust_remote_code, @@ -1404,7 +1404,7 @@ def _get_local_vllm_deployment_handler(self): ports=self.ports, ) - return local_vllm_deployment_handlers['self.model_name'] + return local_vllm_deployment_handlers[self.model_name] def _generate(self, request): From e58b84cb5cd95dadfd4f2ab75c1ac68b5e5adcb1 Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Sat, 15 Mar 2025 00:18:15 +0000 Subject: [PATCH 10/11] background threads need to be daemon, otherwise main thread never finishes --- eureka_ml_insights/models/models.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index 6bd0f4b3..2e8cb2c7 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -8,7 +8,6 @@ import time import urllib.request from abc import ABC, abstractmethod -from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass import anthropic @@ -1307,8 +1306,12 @@ def deploy_servers(self): log_dir = os.path.join("logs", "local_vllm_deployment_logs", f"{date}") os.makedirs(log_dir) - executor = ThreadPoolExecutor(max_workers=self.num_servers) - futures = [executor.submit(lambda index: self.deploy_server(index, gpus_per_port, log_dir), i) for i in range(self.num_servers)] + for index in range(self.num_servers): + background_thread = threading.Thread( + target = lambda: self.deploy_server(index, gpus_per_port, log_dir) + ) + background_thread.daemon = True + background_thread.start() def deploy_server(self, index: int, gpus_per_port: int, log_dir: str): """Deploy a single vLLM server using gpus_per_port many gpus starting at index*gpus_per_port.""" From a9e645dc1966dd42207c04b9e9cada8601583297 Mon Sep 17 00:00:00 2001 From: Michael Harrison Date: Sat, 15 Mar 2025 20:24:44 +0000 Subject: [PATCH 11/11] inherit from EndpointModel, fix shutdown logic --- eureka_ml_insights/models/models.py | 93 ++++++++++++++--------------- main.py | 6 +- 2 files changed, 50 insertions(+), 49 deletions(-) diff --git a/eureka_ml_insights/models/models.py b/eureka_ml_insights/models/models.py index 9ffa48e4..43d5a6e0 100644 --- a/eureka_ml_insights/models/models.py +++ b/eureka_ml_insights/models/models.py @@ -133,6 +133,7 @@ def generate(self, query_text, *args, **kwargs): model_output = None is_valid = False response_time = None + n_output_tokens = None while attempts < self.num_retries: try: @@ -141,6 +142,7 @@ def generate(self, query_text, *args, **kwargs): response_dict.update(model_response) model_output = model_response["model_output"] response_time = model_response["response_time"] + n_output_tokens = model_response.get("n_output_tokens", None) if self.chat_mode: previous_messages = self.update_chat_history(query_text, model_output, *args, **kwargs) @@ -160,7 +162,7 @@ def generate(self, query_text, *args, **kwargs): "is_valid": is_valid, "model_output": model_output, "response_time": response_time, - "n_output_tokens": self.count_tokens(model_output, is_valid), + "n_output_tokens": n_output_tokens or self.count_tokens(model_output, is_valid), } ) if self.chat_mode: @@ -477,7 +479,10 @@ def get_response(self, request): "response_time": response_time, } if "usage" in openai_response: - response_dict.update({"usage": openai_response["usage"]}) + usage = openai_response["usage"] + response_dict.update({"usage": usage}) + if isinstance(usage, dict) and "completion_tokens" in usage: + response_dict.update({"n_output_tokens": usage["completion_tokens"]}) return response_dict @@ -1273,9 +1278,12 @@ def create_request(self, text_prompt, system_message=None): class _LocalVLLMDeploymentHandler: """This class is used to handle the deployment of vLLM servers.""" - # Chose against dataclass here so we have the option to accept kwargs # and pass them to the vLLM deployment script. + + # Used to store references to logs of the servers, since those contain PIDs for shutdown. + logs = [] + def __init__( self, model_name: str = None, @@ -1339,8 +1347,8 @@ def _get_clients(self): else: logging.info(f"Waiting for {self.num_servers - len(healthy_ports)} more servers to come online.") - if len(self.clients) == 0: - raise RuntimeError(f"Failed to start servers.") + if len(self.clients) != self.num_servers: + raise RuntimeError(f"Failed to start all servers.") def get_healthy_ports(self) -> list[str]: """Check if servers are running.""" @@ -1366,22 +1374,23 @@ def deploy_servers(self): os.makedirs(log_dir) for index in range(self.num_servers): + port = 8000 + index + log_file = os.path.join(log_dir, f"{port}.log") + self.logs.append(log_file) background_thread = threading.Thread( - target = lambda: self.deploy_server(index, gpus_per_port, log_dir) + target = lambda: self.deploy_server(index, gpus_per_port, log_file) ) background_thread.daemon = True background_thread.start() - def deploy_server(self, index: int, gpus_per_port: int, log_dir: str): + def deploy_server(self, index: int, gpus_per_port: int, log_file: str): """Deploy a single vLLM server using gpus_per_port many gpus starting at index*gpus_per_port.""" - import os, subprocess - + import subprocess port = 8000 + index first_gpu = index * gpus_per_port last_gpu = first_gpu + gpus_per_port - 1 devices = ",".join(str(gpu_num) for gpu_num in range(first_gpu, last_gpu + 1)) - log_file = os.path.join(log_dir, f"{port}.log") command = [ "CUDA_VISIBLE_DEVICES=" + devices, @@ -1400,19 +1409,34 @@ def deploy_server(self, index: int, gpus_per_port: int, log_dir: str): command.append(self.quantization) if self.trust_remote_code: command.append("--trust_remote_code") - #command.append(">> " + log_file + " 2>&1 &") command = " ".join(command) logging.info(f"Running command: {command}") with open(log_file, 'w') as log_writer: subprocess.run(command, shell=True, stdout=log_writer, stderr=log_writer) + @classmethod + def shutdown_servers(cls): + """Shutdown all vLLM servers deployed during this run.""" + + import re, os, signal + for log_file in cls.logs: + with open(log_file, "r") as f: + for line in f: + if "Started server process" in line: + match = re.search(r"\d+", line) + if match: + pid = int(match.group()) + logging.info(f"Shutting down server with PID {pid}.") + os.kill(pid, signal.SIGINT) + break + local_vllm_model_lock = threading.Lock() local_vllm_deployment_handlers : dict[str, _LocalVLLMDeploymentHandler] = {} @dataclass -class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn): +class LocalVLLMModel(OpenAICommonRequestResponseMixIn, EndpointModel): """This class is used for vLLM servers running locally. In case the servers are already deployed, specify the @@ -1442,11 +1466,17 @@ class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn): top_p: float = .95 top_k: int = -1 max_tokens: int = 2000 + frequency_penalty: float = 0 + presence_penalty: float = 0 def __post_init__(self): if not self.model_name: raise ValueError("LocalVLLM model_name must be specified.") self.handler = self._get_local_vllm_deployment_handler() + + @property + def client(self): + return random.choice(self.handler.clients) def _get_local_vllm_deployment_handler(self): if self.model_name not in local_vllm_deployment_handlers: @@ -1467,42 +1497,9 @@ def _get_local_vllm_deployment_handler(self): ) return local_vllm_deployment_handlers[self.model_name] - - def _generate(self, request): - - # Similar logic as OpenAICommonRequestResponseMixIn. - # If OpenAICommonRequestResponseMixIn is adapted for threadsafety, - # I think there's a way to use it even with multiple clients. - start_time = time.time() - client = random.choice(self.handler.clients) - completion = client.chat.completions.create( - model=self.model_name, - max_tokens=self.max_tokens, - **request - ) - end_time = time.time() - raw_output = completion.model_dump() - - return { - "model_output": raw_output["choices"][0]["message"]["content"], - "response_time": end_time - start_time, - "n_output_tokens": raw_output["usage"]["completion_tokens"] - } - - def generate(self, text_prompt, query_images=None, system_message=None): - response_dict = {} - - if text_prompt: - # Format request for OpenAI API using create_request from OpenAICommonRequestResponseMixIn - request = self.create_request(text_prompt, query_images, system_message) - try: - response_dict.update(self._generate(request)) - response_dict['is_valid'] = True - except Exception as e: - logging.warning(e) - response_dict['is_valid'] = False - - return response_dict + + def handle_request_error(self, e): + return False @dataclass diff --git a/main.py b/main.py index d495b0f0..fae95a15 100755 --- a/main.py +++ b/main.py @@ -50,7 +50,7 @@ if isinstance(model_config, ModelConfig): model_config.init_args["ports"] = args.ports model_config.init_args["num_servers"] = args.num_servers if args.num_servers else 1 - init_args["model_config"] = model_config + init_args["model_config"] = model_config # Logic above is that certain deployment parameters like ports and num_servers # can be variable and so we allow them to be overridden by command line args. except AttributeError: @@ -81,3 +81,7 @@ logging.info(f"Saving experiment logs in {pipeline_config.log_dir}.") pipeline = Pipeline(pipeline_config.component_configs, pipeline_config.log_dir) pipeline.run() + + if args.local_vllm: + from eureka_ml_insights.models.models import _LocalVLLMDeploymentHandler + _LocalVLLMDeploymentHandler.shutdown_servers()