Skip to content

Commit 0724f19

Browse files
authored
Enable AI Dynamo w/ K8S SPCx (Alpha) - Rel 2B (#667)
1 parent 9650abe commit 0724f19

File tree

10 files changed

+587
-211
lines changed

10 files changed

+587
-211
lines changed

conf/common/system/kubernetes_cluster.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
2-
# Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
33
# SPDX-License-Identifier: Apache-2.0
44
#
55
# Licensed under the Apache License, Version 2.0 (the "License");

src/cloudai/registration.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def register_all():
5555
from cloudai.systems.slurm import SlurmInstaller, SlurmRunner, SlurmSystem
5656
from cloudai.systems.standalone import StandaloneInstaller, StandaloneRunner, StandaloneSystem
5757
from cloudai.workloads.ai_dynamo import (
58+
AIDynamoKubernetesJsonGenStrategy,
5859
AIDynamoReportGenerationStrategy,
5960
AIDynamoSlurmCommandGenStrategy,
6061
AIDynamoTestDefinition,
@@ -183,6 +184,9 @@ def register_all():
183184
Registry().add_command_gen_strategy(SlurmSystem, NixlPerftestTestDefinition, NixlPerftestSlurmCommandGenStrategy)
184185

185186
Registry().add_command_gen_strategy(SlurmSystem, AIDynamoTestDefinition, AIDynamoSlurmCommandGenStrategy)
187+
Registry().add_strategy(
188+
JsonGenStrategy, [KubernetesSystem], [AIDynamoTestDefinition], AIDynamoKubernetesJsonGenStrategy
189+
)
186190
Registry().add_command_gen_strategy(SlurmSystem, BashCmdTestDefinition, BashCmdCommandGenStrategy)
187191
Registry().add_command_gen_strategy(SlurmSystem, NIXLKVBenchTestDefinition, NIXLKVBenchSlurmCommandGenStrategy)
188192

src/cloudai/systems/kubernetes/kubernetes_installer.py

Lines changed: 148 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020
import shutil
2121
import subprocess
2222
from pathlib import Path
23+
from shutil import rmtree
2324

24-
from cloudai.core import BaseInstaller, DockerImage, GitRepo, Installable, InstallStatusResult
25+
from cloudai.core import BaseInstaller, DockerImage, File, GitRepo, Installable, InstallStatusResult, PythonExecutable
2526
from cloudai.util.lazy_imports import lazy
2627

2728

@@ -55,83 +56,36 @@ def _check_prerequisites(self) -> InstallStatusResult:
5556
logging.error(message)
5657
return InstallStatusResult(False, message)
5758

58-
# Check MPIJob-related prerequisites
59-
mpi_job_result = self._check_mpi_job_prerequisites()
60-
if not mpi_job_result.success:
61-
return mpi_job_result
62-
6359
logging.info("All prerequisites are met. Proceeding with installation.")
6460
return InstallStatusResult(True)
6561

66-
def _check_mpi_job_prerequisites(self) -> InstallStatusResult:
67-
"""
68-
Check if the MPIJob CRD is installed and if MPIJob kind is supported in the Kubernetes cluster.
69-
70-
This ensures that the system is ready for MPI-based operations.
71-
72-
Returns
73-
InstallStatusResult: Result containing the status of the MPIJob prerequisite check and any error message.
74-
"""
75-
# Check if MPIJob CRD is installed
76-
try:
77-
custom_api = lazy.k8s.client.CustomObjectsApi()
78-
custom_api.get_cluster_custom_object(group="kubeflow.org", version="v1", plural="mpijobs", name="mpijobs")
79-
except lazy.k8s.client.ApiException as e:
80-
if e.status == 404:
81-
message = (
82-
"Installation failed during prerequisite checking stage because MPIJob CRD is not installed on "
83-
"this Kubernetes cluster. Please ensure that the MPI Operator is installed and MPIJob kind is "
84-
"supported. You can follow the instructions in the MPI Operator repository to install it: "
85-
"https://github.com/kubeflow/mpi-operator"
86-
)
87-
logging.error(message)
88-
return InstallStatusResult(False, message)
89-
else:
90-
message = (
91-
f"Installation failed during prerequisite checking stage due to an error while checking for MPIJob "
92-
f"CRD. Original error: {e!r}. Please ensure that the Kubernetes cluster is accessible and the "
93-
f"MPI Operator is correctly installed."
94-
)
95-
logging.error(message)
96-
return InstallStatusResult(False, message)
97-
98-
# Check if MPIJob kind is supported
99-
try:
100-
api_resources = lazy.k8s.client.ApiextensionsV1Api().list_custom_resource_definition()
101-
mpi_job_supported = any(item.metadata.name == "mpijobs.kubeflow.org" for item in api_resources.items)
102-
except lazy.k8s.client.ApiException as e:
103-
message = (
104-
f"Installation failed during prerequisite checking stage due to an error while checking for MPIJob "
105-
f"kind support. Original error: {e!r}. Please ensure that the Kubernetes cluster is accessible and "
106-
f"the MPI Operator is correctly installed."
107-
)
108-
logging.error(message)
109-
return InstallStatusResult(False, message)
110-
111-
if not mpi_job_supported:
112-
message = (
113-
"Installation failed during prerequisite checking stage because MPIJob kind is not supported on this "
114-
"Kubernetes cluster. Please ensure that the MPI Operator is installed and MPIJob kind is supported. "
115-
"You can follow the instructions in the MPI Operator repository to install it: "
116-
"https://github.com/kubeflow/mpi-operator"
117-
)
118-
logging.error(message)
119-
return InstallStatusResult(False, message)
120-
121-
return InstallStatusResult(True)
122-
12362
def install_one(self, item: Installable) -> InstallStatusResult:
12463
if isinstance(item, DockerImage):
12564
return InstallStatusResult(True, f"Docker image {item} installed")
12665
elif isinstance(item, GitRepo):
12766
return self._install_one_git_repo(item)
67+
elif isinstance(item, PythonExecutable):
68+
return self._install_python_executable(item)
69+
elif isinstance(item, File):
70+
item.installed_path = self.system.install_path / item.src.name
71+
shutil.copyfile(item.src, item.installed_path, follow_symlinks=False)
72+
return InstallStatusResult(True)
12873
return InstallStatusResult(False, f"Unsupported item type: {type(item)}")
12974

13075
def uninstall_one(self, item: Installable) -> InstallStatusResult:
13176
if isinstance(item, DockerImage):
13277
return InstallStatusResult(True, f"Docker image {item} uninstalled")
13378
elif isinstance(item, GitRepo):
13479
return self._uninstall_git_repo(item)
80+
elif isinstance(item, PythonExecutable):
81+
return self._uninstall_python_executable(item)
82+
elif isinstance(item, File):
83+
if item.installed_path != item.src:
84+
item.installed_path.unlink()
85+
item._installed_path = None
86+
return InstallStatusResult(True)
87+
logging.debug(f"File {item.installed_path} does not exist.")
88+
return InstallStatusResult(True)
13589
return InstallStatusResult(False, f"Unsupported item type: {type(item)}")
13690

13791
def is_installed_one(self, item: Installable) -> InstallStatusResult:
@@ -143,6 +97,8 @@ def is_installed_one(self, item: Installable) -> InstallStatusResult:
14397
item.installed_path = repo_path
14498
return InstallStatusResult(True)
14599
return InstallStatusResult(False, f"Git repository {item.url} not cloned")
100+
elif isinstance(item, PythonExecutable):
101+
return self._is_python_executable_installed(item)
146102
return InstallStatusResult(False, f"Unsupported item type: {type(item)}")
147103

148104
def mark_as_installed_one(self, item: Installable) -> InstallStatusResult:
@@ -151,6 +107,10 @@ def mark_as_installed_one(self, item: Installable) -> InstallStatusResult:
151107
elif isinstance(item, GitRepo):
152108
item.installed_path = self.system.install_path / item.repo_name
153109
return InstallStatusResult(True)
110+
elif isinstance(item, PythonExecutable):
111+
item.git_repo.installed_path = self.system.install_path / item.git_repo.repo_name
112+
item.venv_path = self.system.install_path / item.venv_name
113+
return InstallStatusResult(True)
154114
return InstallStatusResult(False, f"Unsupported item type: {type(item)}")
155115

156116
def _install_one_git_repo(self, item: GitRepo) -> InstallStatusResult:
@@ -172,6 +132,45 @@ def _install_one_git_repo(self, item: GitRepo) -> InstallStatusResult:
172132
item.installed_path = repo_path
173133
return InstallStatusResult(True)
174134

135+
def _install_python_executable(self, item: PythonExecutable) -> InstallStatusResult:
136+
res = self._install_one_git_repo(item.git_repo)
137+
if not res.success:
138+
return res
139+
140+
res = self._create_venv(item)
141+
if not res.success:
142+
return res
143+
144+
return InstallStatusResult(True)
145+
146+
def _install_dependencies(self, item: PythonExecutable) -> InstallStatusResult:
147+
venv_path = self.system.install_path / item.venv_name
148+
149+
if not item.git_repo.installed_path:
150+
return InstallStatusResult(False, "Git repository must be installed before creating virtual environment.")
151+
152+
project_dir = item.git_repo.installed_path
153+
154+
if item.project_subpath:
155+
project_dir = project_dir / item.project_subpath
156+
157+
pyproject_toml = project_dir / "pyproject.toml"
158+
requirements_txt = project_dir / "requirements.txt"
159+
160+
if pyproject_toml.exists() and requirements_txt.exists():
161+
if item.dependencies_from_pyproject:
162+
res = self._install_pyproject(venv_path, project_dir)
163+
else:
164+
res = self._install_requirements(venv_path, requirements_txt)
165+
elif pyproject_toml.exists():
166+
res = self._install_pyproject(venv_path, project_dir)
167+
elif requirements_txt.exists():
168+
res = self._install_requirements(venv_path, requirements_txt)
169+
else:
170+
return InstallStatusResult(False, "No pyproject.toml or requirements.txt found for installation.")
171+
172+
return res
173+
175174
def _clone_repository(self, git_url: str, path: Path) -> InstallStatusResult:
176175
logging.debug(f"Cloning repository {git_url} into {path}")
177176
clone_cmd = ["git", "clone"]
@@ -195,6 +194,58 @@ def _checkout_commit(self, commit_hash: str, path: Path) -> InstallStatusResult:
195194
return InstallStatusResult(False, f"Failed to checkout commit: {result.stderr}")
196195
return InstallStatusResult(True)
197196

197+
def _create_venv(self, item: PythonExecutable) -> InstallStatusResult:
198+
venv_path = self.system.install_path / item.venv_name
199+
logging.debug(f"Creating virtual environment in {venv_path}")
200+
if venv_path.exists():
201+
msg = f"Virtual environment already exists at {venv_path}."
202+
logging.debug(msg)
203+
return InstallStatusResult(True, msg)
204+
205+
cmd = ["python", "-m", "venv", str(venv_path)]
206+
logging.debug(f"Creating venv using cmd: {' '.join(cmd)}")
207+
result = subprocess.run(cmd, capture_output=True, text=True)
208+
logging.debug(f"venv creation STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}")
209+
if result.returncode != 0:
210+
if venv_path.exists():
211+
rmtree(venv_path)
212+
return InstallStatusResult(
213+
False, f"Failed to create venv:\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
214+
)
215+
216+
res = self._install_dependencies(item)
217+
if not res.success:
218+
if venv_path.exists():
219+
rmtree(venv_path)
220+
return res
221+
222+
item.venv_path = self.system.install_path / item.venv_name
223+
224+
return InstallStatusResult(True)
225+
226+
def _install_pyproject(self, venv_dir: Path, project_dir: Path) -> InstallStatusResult:
227+
install_cmd = [str(venv_dir / "bin" / "python"), "-m", "pip", "install", str(project_dir)]
228+
logging.debug(f"Installing dependencies using: {' '.join(install_cmd)}")
229+
result = subprocess.run(install_cmd, capture_output=True, text=True)
230+
231+
if result.returncode != 0:
232+
return InstallStatusResult(False, f"Failed to install {project_dir} using pip: {result.stderr}")
233+
234+
return InstallStatusResult(True)
235+
236+
def _install_requirements(self, venv_dir: Path, requirements_txt: Path) -> InstallStatusResult:
237+
if not requirements_txt.is_file():
238+
return InstallStatusResult(False, f"Requirements file is invalid or does not exist: {requirements_txt}")
239+
240+
install_cmd = [str(venv_dir / "bin" / "python"), "-m", "pip", "install", "-r", str(requirements_txt)]
241+
logging.debug(f"Installing dependencies using: {' '.join(install_cmd)}")
242+
result = subprocess.run(install_cmd, capture_output=True, text=True)
243+
244+
if result.returncode != 0:
245+
return InstallStatusResult(False, f"Failed to install dependencies from requirements.txt: {result.stderr}")
246+
247+
return InstallStatusResult(True)
248+
198249
def _uninstall_git_repo(self, item: GitRepo) -> InstallStatusResult:
199250
logging.debug(f"Uninstalling git repository at {item.installed_path=}")
200251
repo_path = item.installed_path if item.installed_path else self.system.install_path / item.repo_name
@@ -207,3 +258,37 @@ def _uninstall_git_repo(self, item: GitRepo) -> InstallStatusResult:
207258
item.installed_path = None
208259

209260
return InstallStatusResult(True)
261+
262+
def _uninstall_python_executable(self, item: PythonExecutable) -> InstallStatusResult:
263+
res = self._uninstall_git_repo(item.git_repo)
264+
if not res.success:
265+
return res
266+
267+
logging.debug(f"Uninstalling virtual environment at {item.venv_path=}")
268+
venv_path = item.venv_path if item.venv_path else self.system.install_path / item.venv_name
269+
if not venv_path.exists():
270+
msg = f"Virtual environment {item.venv_name} is not created."
271+
return InstallStatusResult(True, msg)
272+
273+
logging.debug(f"Removing folder {venv_path}")
274+
rmtree(venv_path)
275+
item.venv_path = None
276+
277+
return InstallStatusResult(True)
278+
279+
def _is_python_executable_installed(self, item: PythonExecutable) -> InstallStatusResult:
280+
repo_path = (
281+
item.git_repo.installed_path
282+
if item.git_repo.installed_path
283+
else self.system.install_path / item.git_repo.repo_name
284+
)
285+
if not repo_path.exists():
286+
return InstallStatusResult(False, f"Git repository {item.git_repo.url} not cloned")
287+
item.git_repo.installed_path = repo_path
288+
289+
venv_path = item.venv_path if item.venv_path else self.system.install_path / item.venv_name
290+
if not venv_path.exists():
291+
return InstallStatusResult(False, f"Virtual environment not created for {item.git_repo.url}")
292+
item.venv_path = venv_path
293+
294+
return InstallStatusResult(True, "Python executable installed")

src/cloudai/systems/kubernetes/kubernetes_runner.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,18 @@ def _submit_test(self, tr: TestRun) -> KubernetesJob:
3838
k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system)
3939
job_name = k8s_system.create_job(job_spec)
4040

41-
return KubernetesJob(tr, id=job_name, name=job_name, kind=job_kind)
41+
job = KubernetesJob(tr, id=job_name, name=job_name, kind=job_kind)
42+
43+
return job
4244

4345
def on_job_completion(self, job: BaseJob) -> None:
4446
k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system)
4547
k_job = cast(KubernetesJob, job)
46-
k8s_system.store_logs_for_job(k_job.name, k_job.test_run.output_path)
47-
k8s_system.delete_job(k_job.name, k_job.kind)
48+
if k_job.kind == "dynamographdeployment":
49+
k8s_system._delete_dynamo_graph_deployment(k_job.name)
50+
else:
51+
k8s_system.store_logs_for_job(k_job.name, k_job.test_run.output_path)
52+
k8s_system.delete_job(k_job.name, k_job.kind)
4853

4954
def kill_job(self, job: BaseJob) -> None:
5055
k8s_system: KubernetesSystem = cast(KubernetesSystem, self.system)

0 commit comments

Comments
 (0)