Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LocalVLLMModel and deployment handler #102

Merged
merged 14 commits into from
Mar 17, 2025
61 changes: 61 additions & 0 deletions deploy_and_run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/bin/bash

export PYTHONPATH="$(pwd):$PYTHONPATH"
model_name="microsoft/phi-4"
current_datetime=$(date +"%Y-%m-%d %H:%M:%S")
log_dir="logs/$(current_datetime)/deploy_and_run"

mkdir -p $log_dir

# vllm args
num_servers=4
tensor_parallel_size=1
pipeline_parallel_size=1
gpu_skip=$((tensor_parallel_size * pipeline_parallel_size))
base_port=8000
VLLM_ARGS="\
--tensor-parallel-size=${tensor_parallel_size} \
--pipeline-parallel-size=${pipeline_parallel_size} \
--gpu-memory-utilization=0.9 \
"
echo "Spinning up servers..."
for (( i = 0; i < $num_servers; i++ )) do
port=$((base_port + i))
first_gpu=$((i * gpu_skip))
last_gpu=$((first_gpu + gpu_skip - 1))
devices=$(seq -s, $first_gpu $last_gpu)
CUDA_VISIBLE_DEVICES=${devices} vllm serve ${model_name} "$@" --port ${port} ${VLLM_ARGS} >> $log_dir/${port}.log 2>&1 &
done

# Health check to see when servers come online
url="http://0.0.0.0:"${base_port}"/health"

while true; do
# Send the GET request and store the response
response=$(curl -s -o /dev/null -w "%{http_code}" "$url")

if [ "$response" -eq 200 ]; then
echo "Servers online..."
break
else
echo "Waiting for servers to come online..."
fi

sleep 10
done

sleep 10

# Now call eureka to initiate evals.
ports=$(seq -s ' ' $base_port $((base_port + num_servers - 1)))
EUREKA_ARGS="\
--model_config=${model_name} \
--exp_config="IFEval_PIPELINE" \
--local_vllm \
--ports ${ports} \
"
echo "Starting evals..."
python main.py ${EUREKA_ARGS} >> $log_dir/phi4.log 2>&1

echo "Shutting down vllm servers..."
pgrep -f "vllm serve" | xargs kill -INT
22 changes: 22 additions & 0 deletions eureka_ml_insights/configs/model_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
LlamaServerlessAzureRestEndpointModel,
LLaVAHuggingFaceModel,
LLaVAModel,
LocalVLLMModel,
Phi4HFModel,
MistralServerlessAzureRestEndpointModel,
RestEndpointModel,
Expand Down Expand Up @@ -256,3 +257,24 @@
"model_name": "Mistral-large-2407",
},
)

# Local VLLM Models
# Adapt to your local deployments, or give enough info for vllm deployment.
PHI4_LOCAL_CONFIG = ModelConfig(
LocalVLLMModel,
{
# this name must match the vllm deployment name/path
"model_name": "microsoft/phi-4",
# specify ports in case the model is already deployed
"ports": ["8002", "8003"],
},
)
QWENVL_LOCAL_CONFIG = ModelConfig(
LocalVLLMModel,
{
# this name must match the vllm deployment name/path
"model_name": "Qwen/Qwen2.5-VL-7B-Instruct",
# certain args will get passed to the vllm serve command
"tensor_parallel_size": 2,
},
)
2 changes: 2 additions & 0 deletions eureka_ml_insights/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
LlamaServerlessAzureRestEndpointModel,
LLaVAHuggingFaceModel,
LLaVAModel,
LocalVLLMModel,
MistralServerlessAzureRestEndpointModel,
Phi3HFModel,
Phi4HFModel,
Expand All @@ -34,6 +35,7 @@
MistralServerlessAzureRestEndpointModel,
LlamaServerlessAzureRestEndpointModel,
LLaVAModel,
LocalVLLMModel,
RestEndpointModel,
TestModel,
vLLMModel,
Expand Down
208 changes: 208 additions & 0 deletions eureka_ml_insights/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import json
import logging
import requests
import time
import urllib.request
from abc import ABC, abstractmethod
from dataclasses import dataclass
import random

import anthropic
import tiktoken
Expand Down Expand Up @@ -1153,6 +1155,212 @@ def generate(self, text_prompt, query_images=None, system_message=None):

def model_template_fn(self, text_prompt, system_message=None):
raise NotImplementedError


class LocalVLLMDeploymentHandler:
"""This class is used to handle the deployment of vLLM servers."""

# Chose against dataclass here so we have the option to accept kwargs
# and pass them to the vLLM deployment script.
def __init__(
self,
model_name: str = None,
num_servers: int = 1,
trust_remote_code: bool = False,
tensor_parallel_size: int = 1,
pipeline_parallel_size: int = 1,
dtype: str = "auto",
quantization: str = None,
seed: int = 0,
gpu_memory_utilization: float = 0.9,
cpu_offload_gb: float = 0,
ports: list = None,
):
if not model_name:
raise ValueError("LocalVLLM model_name must be specified.")
self.model_name = model_name
self.num_servers = num_servers
self.trust_remote_code = trust_remote_code
self.tensor_parallel_size = tensor_parallel_size
self.pipeline_parallel_size = pipeline_parallel_size
self.dtype = dtype
self.quantization = quantization
self.seed = seed
self.gpu_memory_utilization = gpu_memory_utilization
self.cpu_offload_gb = cpu_offload_gb

self.ports = ports
self.session = requests.Session()
self.clients = self._get_clients()

def _get_clients(self):
'''Get clients to access vllm servers, by checking for running servers and deploying if necessary.'''
from openai import OpenAI as OpenAIClient

# If the user passes ports, check if the servers are running and populate clients accordingly.
if self.ports:
healthy_server_urls = ['http://0.0.0.0:' + port + '/v1' for port in self.get_healthy_ports()]
if len(healthy_server_urls) > 0:
logging.info(f"Found {len(healthy_server_urls)} healthy servers.")
return [OpenAIClient(base_url=url, api_key = 'none') for url in healthy_server_urls]

# Even if the user doesn't pass ports, we can check if there happen to be deployed servers.
# There is no guarantee that the servers are hosting the correct model.
self.ports = [str(8000 + i) for i in range(self.num_servers)]
healthy_server_urls = ['http://0.0.0.0:' + port + '/v1' for port in self.get_healthy_ports()]
if len(healthy_server_urls) == self.num_servers:
logging.info(f"Found {len(healthy_server_urls)} healthy servers.")
return [OpenAIClient(base_url=url, api_key = 'none') for url in healthy_server_urls]

# If that didn't work, let's deploy and wait for servers to come online.
self.deploy_servers()
server_start_time = time.time()
while time.time() - server_start_time < 600:
time.sleep(10)
healthy_ports = self.get_healthy_ports()
if len(healthy_ports) == self.num_servers:
logging.info(f"All {self.num_servers} servers are online.")
healthy_server_urls = ['http://0.0.0.0:' + port + '/v1' for port in healthy_ports]
return [OpenAIClient(base_url=url, api_key = 'none') for url in healthy_server_urls]
else:
logging.info(f"Waiting for {self.num_servers - len(healthy_ports)} more servers to come online.")

if len(self.clients) == 0:
raise RuntimeError(f"Failed to start servers.")

def get_healthy_ports(self) -> list[str]:
"""Check if servers are running."""

healthy_ports = []
for port in self.ports:
try:
self.session.get('http://0.0.0.0:' + port +'/health')
healthy_ports.append(port)
except:
pass
return healthy_ports

def deploy_servers(self):
logging.info(f"No vLLM servers are running. Starting {self.num_servers} new servers at {self.ports}.")
import os, subprocess, sys, datetime

env = os.environ.copy()
env['NUM_SERVERS'] = str(self.num_servers)
env['CURRENT_PYTHON_EXEC'] = sys.executable
env['GPU_SKIP'] = str(self.pipeline_parallel_size * self.tensor_parallel_size)

date = datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S.%f")
log_dir = os.path.join("logs", "local_vllm_deployment_logs", f"{date}")
os.makedirs(log_dir)
env['LOCAL_VLLM_LOG_DIR'] = log_dir

command = [
os.path.dirname(os.path.abspath(__file__)) + "/vllm_deployment_script.sh",
"--model", self.model_name,
"--tensor_parallel_size", str(self.tensor_parallel_size),
"--pipeline_parallel_size", str(self.pipeline_parallel_size),
"--dtype", self.dtype,
"--seed", str(self.seed),
"--gpu_memory_utilization", str(self.gpu_memory_utilization),
"--cpu_offload_gb", str(self.cpu_offload_gb)
]
if self.quantization:
command.append("--quantization")
command.append(self.quantization)
if self.trust_remote_code:
command.append("--trust_remote_code")
logging.info(f"Running command: {command}")
response = subprocess.run(command, text=True, env=env)
return response

@classmethod
def shutdown_servers(cls):
# Consider whether this is appropriate since it will probably kill all vLLM servers.
import subprocess
logging.info(f"Shutting down vLLM servers.")
command = [f'pgrep -f "vllm.entrypoints.openai.api_server --model" | xargs kill -INT']
subprocess.run(command, shell=True)


@dataclass
class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn):
"""This class is used when you have multiple vLLM servers running locally."""

model_name: str = None

# Deployment parameters
num_servers: int = 1
trust_remote_code: bool = False
tensor_parallel_size: int = 1
pipeline_parallel_size: int = 1
dtype: str = "auto"
quantization: str = None
seed: int = 0
gpu_memory_utilization: float = 0.9
cpu_offload_gb: float = 0

# Deployment handler
ports: list = None
handler: LocalVLLMDeploymentHandler = None

# Inference parameters
temperature: float = 0.01
top_p: float = .95
top_k: int = -1
max_tokens: int = 2000

def __post_init__(self):
if not self.model_name:
raise ValueError("LocalVLLM model_name must be specified.")
self.handler = LocalVLLMDeploymentHandler(
model_name=self.model_name,
num_servers=self.num_servers,
trust_remote_code=self.trust_remote_code,
pipeline_parallel_size=self.pipeline_parallel_size,
tensor_parallel_size=self.tensor_parallel_size,
dtype=self.dtype,
quantization=self.quantization,
seed=self.seed,
gpu_memory_utilization=self.gpu_memory_utilization,
cpu_offload_gb=self.cpu_offload_gb,
ports=self.ports,
)

def _generate(self, request):

# Similar logic as OpenAICommonRequestResponseMixIn.
# If OpenAICommonRequestResponseMixIn is adapted for threadsafety,
# I think there's a way to use it even with multiple clients.
start_time = time.time()
client = random.choice(self.handler.clients)
completion = client.chat.completions.create(
model=self.model_name,
max_tokens=self.max_tokens,
**request
)
end_time = time.time()
raw_output = completion.model_dump()

return {
"model_output": raw_output["choices"][0]["message"]["content"],
"response_time": end_time - start_time,
"n_output_tokens": raw_output["usage"]["completion_tokens"]
}

def generate(self, text_prompt, query_images=None, system_message=None):
response_dict = {}

if text_prompt:
# Format request for OpenAI API using create_request from OpenAIRequestResponseMixIn
request = self.create_request(text_prompt, query_images, system_message)
try:
response_dict.update(self._generate(request))
response_dict['is_valid'] = True
except Exception as e:
logging.warning(e)
response_dict['is_valid'] = False

return response_dict


@dataclass
Expand Down
11 changes: 11 additions & 0 deletions eureka_ml_insights/models/vllm_deployment_script.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash
set -e -x

for (( i = 0; i < $NUM_SERVERS; i++ )) do
port=$((8000 + i))
# Here GPU_SKIP is set as tensor_parallel_size*pipeline_parallel_size
first_gpu=$((i * GPU_SKIP))
last_gpu=$((first_gpu + GPU_SKIP - 1))
devices=$(seq -s, $first_gpu $last_gpu)
CUDA_VISIBLE_DEVICES=${devices} "$CURRENT_PYTHON_EXEC" -m vllm.entrypoints.openai.api_server "$@" --port ${port} >> ${LOCAL_VLLM_LOG_DIR}/${port}.log 2>&1 &
done
Loading
Loading