Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LocalVLLMModel and deployment handler #102

Merged
merged 14 commits into from
Mar 17, 2025
77 changes: 77 additions & 0 deletions deploy_vllm_and_run_eval.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/bin/bash

export PYTHONPATH="$(pwd):$PYTHONPATH"
model_name="microsoft/phi-4"
exp_config="IFEval_PIPELINE"
current_datetime=$(date +"%Y-%m-%d-%H:%M:%S")
log_dir="logs/deploy_vllm_and_run_eval/$current_datetime"
mkdir -p $log_dir

# vLLM args
num_servers=4
tensor_parallel_size=1
pipeline_parallel_size=1
base_port=8000
gpus_per_port=$((tensor_parallel_size * pipeline_parallel_size))

# Add any additional args accepted by vLLM serve here
VLLM_ARGS="\
--tensor-parallel-size=${tensor_parallel_size} \
--pipeline-parallel-size=${pipeline_parallel_size} \
--gpu-memory-utilization=0.9 \
"

# Start servers
echo "Spinning up servers..."
for (( i = 0; i < $num_servers; i++ )) do
port=$((base_port + i))
first_gpu=$((i * gpus_per_port))
last_gpu=$((first_gpu + gpus_per_port - 1))
devices=$(seq -s, $first_gpu $last_gpu)
CUDA_VISIBLE_DEVICES=${devices} vllm serve ${model_name} "$@" --port ${port} ${VLLM_ARGS} >> $log_dir/${port}.log 2>&1 &
done

# Wait for servers to come online
while true; do

servers_online=0
for (( i = 0; i < $num_servers; i++ )) do
port=$((base_port + i))
url="http://0.0.0.0:${port}/health"
response=$(curl -s -o /dev/null -w "%{http_code}" "$url")

if [ "$response" -eq 200 ]; then
servers_online=$((servers_online + 1))
fi
done

if [ $servers_online -eq $num_servers ]; then
echo "All servers are online."
break
else
echo "Waiting for $((num_servers - servers_online)) more servers to come online..."
fi

sleep 10
done

# Call Eureka to initiate evals
ports=$(seq -s ' ' $base_port $((base_port + num_servers - 1)))
EUREKA_ARGS="\
--model_config=${model_name} \
--exp_config=${exp_config} \
--local_vllm \
--ports ${ports} \
"
echo "Starting evals..."
python main.py ${EUREKA_ARGS} >> $log_dir/out.log 2>&1

# Shut down servers
echo "Shutting down vLLM servers..."
for (( i = 0; i < $num_servers; i++ )) do
port=$((base_port + i))
logfile="$log_dir/${port}.log"
pid=$(grep "Started server process" $logfile | grep -o '[0-9]\+')
echo "Shutting down server on port ${port} (PID ${pid})"
kill -INT $pid
done
24 changes: 23 additions & 1 deletion eureka_ml_insights/configs/model_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
LlamaServerlessAzureRestEndpointModel,
LLaVAHuggingFaceModel,
LLaVAModel,
LocalVLLMModel,
Phi4HFModel,
MistralServerlessAzureRestEndpointModel,
DeepseekR1ServerlessAzureRestEndpointModel,
Expand Down Expand Up @@ -283,6 +284,27 @@
},
)

# Local VLLM Models
# Adapt to your local deployments, or give enough info for vllm deployment.
PHI4_LOCAL_CONFIG = ModelConfig(
LocalVLLMModel,
{
# this name must match the vllm deployment name/path
"model_name": "microsoft/phi-4",
# specify ports in case the model is already deployed
"ports": ["8002", "8003"],
},
)
QWQ32B_LOCAL_CONFIG = ModelConfig(
LocalVLLMModel,
{
# this name must match the vllm deployment name/path
"model_name": "Qwen/QwQ-32B",
# certain args will get passed to the vllm serve command
"tensor_parallel_size": 2,
},
)

# DeepSeek R1 Endpoints on Azure
DEEPSEEK_R1_CONFIG = ModelConfig(
DeepseekR1ServerlessAzureRestEndpointModel,
Expand All @@ -297,4 +319,4 @@
# the timeout parameter is passed to urllib.request.urlopen(request, timeout=self.timeout) in ServerlessAzureRestEndpointModel
"timeout": 600,
},
)
)
2 changes: 2 additions & 0 deletions eureka_ml_insights/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
LlamaServerlessAzureRestEndpointModel,
LLaVAHuggingFaceModel,
LLaVAModel,
LocalVLLMModel,
MistralServerlessAzureRestEndpointModel,
DeepseekR1ServerlessAzureRestEndpointModel,
Phi3HFModel,
Expand All @@ -36,6 +37,7 @@
LlamaServerlessAzureRestEndpointModel,
DeepseekR1ServerlessAzureRestEndpointModel,
LLaVAModel,
LocalVLLMModel,
RestEndpointModel,
TestModel,
VLLMModel,
Expand Down
237 changes: 237 additions & 0 deletions eureka_ml_insights/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import json
import logging
import random
import requests
import threading
import time
import urllib.request
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -1207,6 +1210,240 @@ def create_request(self, text_prompt, system_message=None):
]
else:
return [{"role": "user", "content": text_prompt}]


class _LocalVLLMDeploymentHandler:
"""This class is used to handle the deployment of vLLM servers."""

# Chose against dataclass here so we have the option to accept kwargs
# and pass them to the vLLM deployment script.
def __init__(
self,
model_name: str = None,
num_servers: int = 1,
trust_remote_code: bool = False,
tensor_parallel_size: int = 1,
pipeline_parallel_size: int = 1,
dtype: str = "auto",
quantization: str = None,
seed: int = 0,
gpu_memory_utilization: float = 0.9,
cpu_offload_gb: float = 0,
ports: list = None,
):
if not model_name:
raise ValueError("LocalVLLM model_name must be specified.")
self.model_name = model_name
self.num_servers = num_servers
self.trust_remote_code = trust_remote_code
self.tensor_parallel_size = tensor_parallel_size
self.pipeline_parallel_size = pipeline_parallel_size
self.dtype = dtype
self.quantization = quantization
self.seed = seed
self.gpu_memory_utilization = gpu_memory_utilization
self.cpu_offload_gb = cpu_offload_gb

self.ports = ports
self.session = requests.Session()
self.clients = self._get_clients()

def _get_clients(self):
'''Get clients to access vllm servers, by checking for running servers and deploying if necessary.'''
from openai import OpenAI as OpenAIClient

# If the user passes ports, check if the servers are running and populate clients accordingly.
if self.ports:
healthy_server_urls = ['http://0.0.0.0:' + port + '/v1' for port in self.get_healthy_ports()]
if len(healthy_server_urls) > 0:
logging.info(f"Found {len(healthy_server_urls)} healthy servers.")
return [OpenAIClient(base_url=url, api_key = 'none') for url in healthy_server_urls]

# Even if the user doesn't pass ports, we can check if there happen to be deployed servers.
# There is no guarantee that the servers are hosting the correct model.
self.ports = [str(8000 + i) for i in range(self.num_servers)]
healthy_server_urls = ['http://0.0.0.0:' + port + '/v1' for port in self.get_healthy_ports()]
if len(healthy_server_urls) == self.num_servers:
logging.info(f"Found {len(healthy_server_urls)} healthy servers.")
return [OpenAIClient(base_url=url, api_key = 'none') for url in healthy_server_urls]

# If that didn't work, let's deploy and wait for servers to come online.
self.deploy_servers()
server_start_time = time.time()
while time.time() - server_start_time < 600:
time.sleep(10)
healthy_ports = self.get_healthy_ports()
if len(healthy_ports) == self.num_servers:
logging.info(f"All {self.num_servers} servers are online.")
healthy_server_urls = ['http://0.0.0.0:' + port + '/v1' for port in healthy_ports]
return [OpenAIClient(base_url=url, api_key = 'none') for url in healthy_server_urls]
else:
logging.info(f"Waiting for {self.num_servers - len(healthy_ports)} more servers to come online.")

if len(self.clients) == 0:
raise RuntimeError(f"Failed to start servers.")

def get_healthy_ports(self) -> list[str]:
"""Check if servers are running."""

healthy_ports = []
for port in self.ports:
try:
self.session.get('http://0.0.0.0:' + port +'/health')
healthy_ports.append(port)
except:
pass
return healthy_ports

def deploy_servers(self):
"""Deploy vLLM servers in background threads using the specified parameters."""

logging.info(f"No vLLM servers are running. Starting {self.num_servers} new servers at {self.ports}.")
import os, datetime

gpus_per_port = self.pipeline_parallel_size * self.tensor_parallel_size
date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S.%f")
log_dir = os.path.join("logs", "local_vllm_deployment_logs", f"{date}")
os.makedirs(log_dir)

for index in range(self.num_servers):
background_thread = threading.Thread(
target = lambda: self.deploy_server(index, gpus_per_port, log_dir)
)
background_thread.daemon = True
background_thread.start()

def deploy_server(self, index: int, gpus_per_port: int, log_dir: str):
"""Deploy a single vLLM server using gpus_per_port many gpus starting at index*gpus_per_port."""

import os, subprocess

port = 8000 + index
first_gpu = index * gpus_per_port
last_gpu = first_gpu + gpus_per_port - 1
devices = ",".join(str(gpu_num) for gpu_num in range(first_gpu, last_gpu + 1))
log_file = os.path.join(log_dir, f"{port}.log")

command = [
"CUDA_VISIBLE_DEVICES=" + devices,
"vllm serve",
self.model_name,
"--port", str(port),
"--tensor_parallel_size", str(self.tensor_parallel_size),
"--pipeline_parallel_size", str(self.pipeline_parallel_size),
"--dtype", self.dtype,
"--seed", str(self.seed),
"--gpu_memory_utilization", str(self.gpu_memory_utilization),
"--cpu_offload_gb", str(self.cpu_offload_gb)
]
if self.quantization:
command.append("--quantization")
command.append(self.quantization)
if self.trust_remote_code:
command.append("--trust_remote_code")
#command.append(">> " + log_file + " 2>&1 &")
command = " ".join(command)
logging.info(f"Running command: {command}")
with open(log_file, 'w') as log_writer:
subprocess.run(command, shell=True, stdout=log_writer, stderr=log_writer)


local_vllm_model_lock = threading.Lock()
local_vllm_deployment_handlers : dict[str, _LocalVLLMDeploymentHandler] = {}


@dataclass
class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn):
"""This class is used for vLLM servers running locally.

In case the servers are already deployed, specify the
model_name and the ports at which the servers are hosted.
Otherwise instantiating will initiate a deployment with
any deployment parameters specified."""

model_name: str = None

# Deployment parameters
num_servers: int = 1
trust_remote_code: bool = False
tensor_parallel_size: int = 1
pipeline_parallel_size: int = 1
dtype: str = "auto"
quantization: str = None
seed: int = 0
gpu_memory_utilization: float = 0.9
cpu_offload_gb: float = 0

# Deployment handler
ports: list = None
handler: _LocalVLLMDeploymentHandler = None

# Inference parameters
temperature: float = 0.01
top_p: float = .95
top_k: int = -1
max_tokens: int = 2000

def __post_init__(self):
if not self.model_name:
raise ValueError("LocalVLLM model_name must be specified.")
self.handler = self._get_local_vllm_deployment_handler()

def _get_local_vllm_deployment_handler(self):
if self.model_name not in local_vllm_deployment_handlers:
with local_vllm_model_lock:
if self.model_name not in local_vllm_deployment_handlers:
local_vllm_deployment_handlers[self.model_name] = _LocalVLLMDeploymentHandler(
model_name=self.model_name,
num_servers=self.num_servers,
trust_remote_code=self.trust_remote_code,
pipeline_parallel_size=self.pipeline_parallel_size,
tensor_parallel_size=self.tensor_parallel_size,
dtype=self.dtype,
quantization=self.quantization,
seed=self.seed,
gpu_memory_utilization=self.gpu_memory_utilization,
cpu_offload_gb=self.cpu_offload_gb,
ports=self.ports,
)

return local_vllm_deployment_handlers[self.model_name]

def _generate(self, request):

# Similar logic as OpenAICommonRequestResponseMixIn.
# If OpenAICommonRequestResponseMixIn is adapted for threadsafety,
# I think there's a way to use it even with multiple clients.
start_time = time.time()
client = random.choice(self.handler.clients)
completion = client.chat.completions.create(
model=self.model_name,
max_tokens=self.max_tokens,
**request
)
end_time = time.time()
raw_output = completion.model_dump()

return {
"model_output": raw_output["choices"][0]["message"]["content"],
"response_time": end_time - start_time,
"n_output_tokens": raw_output["usage"]["completion_tokens"]
}

def generate(self, text_prompt, query_images=None, system_message=None):
response_dict = {}

if text_prompt:
# Format request for OpenAI API using create_request from OpenAICommonRequestResponseMixIn
request = self.create_request(text_prompt, query_images, system_message)
try:
response_dict.update(self._generate(request))
response_dict['is_valid'] = True
except Exception as e:
logging.warning(e)
response_dict['is_valid'] = False

return response_dict


@dataclass
Expand Down
Loading
Loading