2
2
3
3
import json
4
4
import logging
5
+ import random
6
+ import re
5
7
import requests
6
8
import time
7
9
import urllib .request
8
10
from abc import ABC , abstractmethod
11
+ from concurrent .futures import ThreadPoolExecutor
9
12
from dataclasses import dataclass
10
- import random
11
13
12
14
import anthropic
13
15
import tiktoken
@@ -1157,7 +1159,7 @@ def model_template_fn(self, text_prompt, system_message=None):
1157
1159
raise NotImplementedError
1158
1160
1159
1161
1160
- class LocalVLLMDeploymentHandler :
1162
+ class _LocalVLLMDeploymentHandler :
1161
1163
"""This class is used to handle the deployment of vLLM servers."""
1162
1164
1163
1165
# Chose against dataclass here so we have the option to accept kwargs
@@ -1241,22 +1243,35 @@ def get_healthy_ports(self) -> list[str]:
1241
1243
return healthy_ports
1242
1244
1243
1245
def deploy_servers (self ):
1246
+ """Deploy vLLM servers in background threads using the specified parameters."""
1247
+
1244
1248
logging .info (f"No vLLM servers are running. Starting { self .num_servers } new servers at { self .ports } ." )
1245
- import os , subprocess , sys , datetime
1246
-
1247
- env = os .environ .copy ()
1248
- env ['NUM_SERVERS' ] = str (self .num_servers )
1249
- env ['CURRENT_PYTHON_EXEC' ] = sys .executable
1250
- env ['GPU_SKIP' ] = str (self .pipeline_parallel_size * self .tensor_parallel_size )
1249
+ import os , datetime
1251
1250
1251
+ gpus_per_port = self .pipeline_parallel_size * self .tensor_parallel_size
1252
1252
date = datetime .datetime .now ().strftime ("%Y-%m-%d-%H-%M-%S.%f" )
1253
1253
log_dir = os .path .join ("logs" , "local_vllm_deployment_logs" , f"{ date } " )
1254
1254
os .makedirs (log_dir )
1255
- env ['LOCAL_VLLM_LOG_DIR' ] = log_dir
1255
+
1256
+ executor = ThreadPoolExecutor (max_workers = self .num_servers )
1257
+ futures = [executor .submit (lambda index : self .deploy_server (index , gpus_per_port , log_dir ), i ) for i in range (self .num_servers )]
1258
+
1259
+ def deploy_server (self , index : int , gpus_per_port : int , log_dir : str ):
1260
+ """Deploy a single vLLM server using gpus_per_port many gpus starting at index*gpus_per_port."""
1261
+
1262
+ import os , subprocess
1263
+
1264
+ port = 8000 + index
1265
+ first_gpu = index * gpus_per_port
1266
+ last_gpu = first_gpu + gpus_per_port - 1
1267
+ devices = "," .join (str (gpu_num ) for gpu_num in range (first_gpu , last_gpu + 1 ))
1268
+ log_file = os .path .join (log_dir , f"{ port } .log" )
1256
1269
1257
1270
command = [
1258
- os .path .dirname (os .path .abspath (__file__ )) + "/vllm_deployment_script.sh" ,
1259
- "--model" , self .model_name ,
1271
+ "CUDA_VISIBLE_DEVICES=" + devices ,
1272
+ "vllm serve" ,
1273
+ self .model_name ,
1274
+ "--port" , str (port ),
1260
1275
"--tensor_parallel_size" , str (self .tensor_parallel_size ),
1261
1276
"--pipeline_parallel_size" , str (self .pipeline_parallel_size ),
1262
1277
"--dtype" , self .dtype ,
@@ -1269,17 +1284,11 @@ def deploy_servers(self):
1269
1284
command .append (self .quantization )
1270
1285
if self .trust_remote_code :
1271
1286
command .append ("--trust_remote_code" )
1287
+ #command.append(">> " + log_file + " 2>&1 &")
1288
+ command = " " .join (command )
1272
1289
logging .info (f"Running command: { command } " )
1273
- response = subprocess .run (command , text = True , env = env )
1274
- return response
1275
-
1276
- @classmethod
1277
- def shutdown_servers (cls ):
1278
- # Consider whether this is appropriate since it will probably kill all vLLM servers.
1279
- import subprocess
1280
- logging .info (f"Shutting down vLLM servers." )
1281
- command = [f'pgrep -f "vllm.entrypoints.openai.api_server --model" | xargs kill -INT' ]
1282
- subprocess .run (command , shell = True )
1290
+ with open (log_file , 'w' ) as log_writer :
1291
+ subprocess .run (command , shell = True , stdout = log_writer , stderr = log_writer )
1283
1292
1284
1293
1285
1294
@dataclass
@@ -1301,7 +1310,7 @@ class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn):
1301
1310
1302
1311
# Deployment handler
1303
1312
ports : list = None
1304
- handler : LocalVLLMDeploymentHandler = None
1313
+ handler : _LocalVLLMDeploymentHandler = None
1305
1314
1306
1315
# Inference parameters
1307
1316
temperature : float = 0.01
@@ -1312,7 +1321,7 @@ class LocalVLLMModel(Model, OpenAICommonRequestResponseMixIn):
1312
1321
def __post_init__ (self ):
1313
1322
if not self .model_name :
1314
1323
raise ValueError ("LocalVLLM model_name must be specified." )
1315
- self .handler = LocalVLLMDeploymentHandler (
1324
+ self .handler = _LocalVLLMDeploymentHandler (
1316
1325
model_name = self .model_name ,
1317
1326
num_servers = self .num_servers ,
1318
1327
trust_remote_code = self .trust_remote_code ,
@@ -1351,7 +1360,7 @@ def generate(self, text_prompt, query_images=None, system_message=None):
1351
1360
response_dict = {}
1352
1361
1353
1362
if text_prompt :
1354
- # Format request for OpenAI API using create_request from OpenAIRequestResponseMixIn
1363
+ # Format request for OpenAI API using create_request from OpenAICommonRequestResponseMixIn
1355
1364
request = self .create_request (text_prompt , query_images , system_message )
1356
1365
try :
1357
1366
response_dict .update (self ._generate (request ))
0 commit comments