forked from opendatahub-io/opendatahub-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
188 lines (146 loc) · 7.03 KB
/
utils.py
File metadata and controls
188 lines (146 loc) · 7.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import re
import time
import pandas as pd
import structlog
from kubernetes.client.rest import ApiException
from kubernetes.dynamic import DynamicClient
from ocp_resources.lm_eval_job import LMEvalJob
from ocp_resources.pod import Pod
from pyhelper_utils.general import tts
from timeout_sampler import TimeoutExpiredError
from utilities.constants import Timeout
from utilities.exceptions import PodLogMissMatchError, UnexpectedFailureError
LOGGER = structlog.get_logger(name=__name__)
def get_lmevaljob_pod(client: DynamicClient, lmevaljob: LMEvalJob, timeout: int = Timeout.TIMEOUT_10MIN) -> Pod:
"""
Gets the pod corresponding to a given LMEvalJob and waits for it to be ready.
Args:
client: The Kubernetes client to use
lmevaljob: The LMEvalJob that the pod is associated with
timeout: How long to wait for the pod, defaults to TIMEOUT_2MIN
Returns:
Pod resource
"""
lmeval_pod = Pod(
client=client,
namespace=lmevaljob.namespace,
name=lmevaljob.name,
)
lmeval_pod.wait(timeout=timeout)
return lmeval_pod
def get_lmeval_tasks(min_downloads: float, max_downloads: float | None = None) -> list[str]:
"""
Gets the list of supported LM-Eval tasks that have above a certain number of minimum downloads on HuggingFace.
Args:
min_downloads: The minimum number of downloads or the percentile of downloads to use as a minimum
max_downloads: The maximum number of downloads or the percentile of downloads to use as a maximum
Returns:
List of LM-Eval task names
"""
if min_downloads <= 0:
raise ValueError("Minimum downloads must be greater than 0")
lmeval_tasks = pd.read_csv(filepath_or_buffer="tests/model_explainability/lm_eval/data/new_task_list.csv")
if isinstance(min_downloads, float):
if not 0 <= min_downloads <= 1:
raise ValueError("Minimum downloads as a percentile must be between 0 and 1")
min_downloads = lmeval_tasks["HF dataset downloads"].quantile(q=min_downloads)
# filter for tasks that either exceed min_downloads OR exist on the OpenLLM leaderboard
# AND exist on LMEval AND do not include image data
filtered_df = lmeval_tasks[
lmeval_tasks["Exists"]
& (lmeval_tasks["Dataset"] != "MMMU/MMMU")
& ((lmeval_tasks["HF dataset downloads"] >= min_downloads) | (lmeval_tasks["OpenLLM leaderboard"]))
]
# if max_downloads is provided, filter for tasks that have less than
# or equal to the maximum number of downloads
if max_downloads is not None:
if max_downloads <= 0 or max_downloads > max(lmeval_tasks["HF dataset downloads"]):
raise ValueError("Maximum downloads must be greater than 0 and less than the maximum number of downloads")
if isinstance(max_downloads, float):
if not 0 <= max_downloads <= 1:
raise ValueError("Maximum downloads as a percentile must be between 0 and 1")
max_downloads = lmeval_tasks["HF dataset downloads"].quantile(q=max_downloads)
filtered_df = filtered_df[filtered_df["HF dataset downloads"] <= max_downloads]
# group tasks by dataset and extract the task with shortest name in the group
unique_tasks = filtered_df.loc[filtered_df.groupby("Dataset")["Name"].apply(lambda x: x.str.len().idxmin())]
unique_tasks = unique_tasks["Name"].tolist()
LOGGER.info(f"Number of unique LMEval tasks with more than {min_downloads} downloads: {len(unique_tasks)}")
return unique_tasks
def validate_lmeval_job_pod_and_logs(lmevaljob_pod: Pod) -> None:
"""Validate LMEval job pod success and presence of corresponding logs.
Args:
lmevaljob_pod: The LMEvalJob pod.
Returns: None
"""
pod_success_log_regex = (
r"INFO\sdriver\supdate status: job completed\s\{\"state\":\s\{\"state\""
r":\"Complete\",\"reason\":\"Succeeded\",\"message\":\"job completed\""
)
lmevaljob_pod.wait_for_status(status=lmevaljob_pod.Status.RUNNING, timeout=tts("10m"))
try:
lmevaljob_pod.wait_for_status(status=Pod.Status.SUCCEEDED, timeout=tts("1h"))
except TimeoutExpiredError as e:
raise UnexpectedFailureError("LMEval job pod failed from a running state.") from e
if not bool(re.search(pod_success_log_regex, lmevaljob_pod.log())):
raise PodLogMissMatchError("LMEval job pod failed.")
def wait_for_vllm_model_ready(
client: DynamicClient,
namespace: str,
inference_service_name: str,
max_wait_time: int = 600,
check_interval: int = 10,
stabilization_wait: int = 10,
) -> Pod:
"""Wait for vLLM model to download and be ready to serve requests.
Args:
client: Kubernetes dynamic client
namespace: Namespace where the inference service is deployed
inference_service_name: Name of the inference service
max_wait_time: Maximum time to wait in seconds
check_interval: Time between checks in seconds
stabilization_wait: Seconds to wait after model is ready for server stabilization
Returns:
The predictor pod once model is ready
Raises:
UnexpectedFailureError: If model fails to load or pod encounters errors
"""
LOGGER.info("Waiting for vLLM model to download and load...")
predictor_pods = list(
Pod.get(
dyn_client=client,
namespace=namespace,
label_selector=f"serving.kserve.io/inferenceservice={inference_service_name}",
)
)
if not predictor_pods:
raise UnexpectedFailureError("No predictor pod found for inference service")
predictor_pods = [pod for pod in predictor_pods if "predictor" in pod.name]
if not predictor_pods:
raise UnexpectedFailureError("No predictor pod found for inference service")
predictor_pod = predictor_pods[0]
LOGGER.info(f"Predictor pod: {predictor_pod.name}")
elapsed_time = 0
model_loaded = False
while elapsed_time < max_wait_time:
try:
pod_logs = predictor_pod.log(container="kserve-container")
if "Uvicorn running on" in pod_logs or "Application startup complete" in pod_logs:
LOGGER.info("vLLM server is running and ready!")
model_loaded = True
break
else:
LOGGER.info(f"Model still loading... (waited {elapsed_time}s)")
except (ApiException, OSError) as e:
LOGGER.info(f"Could not get pod logs yet: {e}")
time.sleep(check_interval)
elapsed_time += check_interval
if not model_loaded:
try:
full_logs = predictor_pod.log(container="kserve-container")
LOGGER.error(f"vLLM pod failed to start within {max_wait_time}s. Full logs:\n{full_logs}")
except (ApiException, OSError) as e:
LOGGER.error(f"Could not retrieve pod logs: {e}")
raise UnexpectedFailureError(f"vLLM model failed to load within {max_wait_time} seconds")
LOGGER.info(f"Model loaded! Waiting {stabilization_wait} more seconds for server stabilization.")
time.sleep(stabilization_wait)
return predictor_pod