Skip to content

Commit b52f2ae

Browse files
authored
Detect low thread environments and adjust task limits (#529)
1 parent 9f83d84 commit b52f2ae

File tree

8 files changed

+86
-129
lines changed

8 files changed

+86
-129
lines changed

src/cloudai/_core/base_installer.py

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,21 @@
1515
# limitations under the License.
1616

1717
import logging
18+
import os
1819
import shutil
20+
import subprocess
1921
from abc import ABC, abstractmethod
2022
from concurrent.futures import ThreadPoolExecutor, as_completed
21-
from typing import Iterable, final
23+
from typing import Iterable, Optional, final
2224

2325
from cloudai.util import prepare_output_dir
2426

2527
from .install_status_result import InstallStatusResult
2628
from .installables import Installable
2729
from .system import System
2830

31+
TASK_LIMIT_THRESHOLD = 256
32+
2933

3034
class BaseInstaller(ABC):
3135
"""
@@ -46,8 +50,54 @@ def __init__(self, system: System):
4650
system (System): The system schema object.
4751
"""
4852
self.system = system
53+
self._low_thread_env = None
4954
logging.debug(f"BaseInstaller initialized for {self.system.scheduler}.")
5055

56+
@property
57+
def is_low_thread_environment(self, threshold: int = TASK_LIMIT_THRESHOLD) -> bool:
58+
"""
59+
Check if the current environment has a limit on the number of threads that is below the threshold.
60+
61+
Args:
62+
threshold (int, optional): The threshold to consider "low thread". Defaults to TASK_LIMIT_THRESHOLD.
63+
64+
Returns:
65+
bool: True if the environment has a low thread limit, False otherwise.
66+
"""
67+
if self._low_thread_env is None:
68+
self._low_thread_env = self._check_low_thread_environment(threshold)
69+
return self._low_thread_env
70+
71+
def _check_low_thread_environment(self, threshold: int = TASK_LIMIT_THRESHOLD) -> bool:
72+
try:
73+
result = subprocess.run(
74+
["systemctl", "show", f"user-{os.getuid()}.slice", "--property=TasksMax"],
75+
capture_output=True,
76+
text=True,
77+
check=True,
78+
)
79+
_, value = result.stdout.strip().split("=", 1)
80+
value = value.strip()
81+
if value.lower() == "infinity":
82+
return False
83+
is_low_thread = int(value) < threshold
84+
if is_low_thread:
85+
logging.info("Low thread environment detected.")
86+
return is_low_thread
87+
except Exception as e:
88+
logging.debug(f"Could not determine TasksMax from systemd: {e}")
89+
return False
90+
91+
@property
92+
def num_workers(self) -> Optional[int]:
93+
"""
94+
Get the appropriate number of worker threads based on the environment.
95+
96+
Returns:
97+
Optional[int]: 1 for low thread environments, None otherwise (allowing ThreadPoolExecutor to choose).
98+
"""
99+
return 1 if self.is_low_thread_environment else None
100+
51101
def _is_binary_installed(self, binary_name: str) -> bool:
52102
"""
53103
Check if a given binary is installed on the system.
@@ -128,7 +178,7 @@ def install(self, items: Iterable[Installable]) -> InstallStatusResult:
128178
logging.info(f"Going to install {len(set(items))} item(s)")
129179

130180
install_results = {}
131-
with ThreadPoolExecutor() as executor:
181+
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
132182
futures = {executor.submit(self.install_one, item): item for item in self.all_items(items)}
133183
total, done = len(futures), 0
134184
for future in as_completed(futures):
@@ -168,8 +218,9 @@ def uninstall(self, items: Iterable[Installable]) -> InstallStatusResult:
168218
"""
169219
logging.debug(f"Going to uninstall {len(set(items))} uniq items (total {len(list(items))}).")
170220
logging.info(f"Going to uninstall {len(set(items))} items.")
221+
171222
uninstall_results = {}
172-
with ThreadPoolExecutor() as executor:
223+
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
173224
futures = {executor.submit(self.uninstall_one, item): item for item in self.all_items(items)}
174225
for future in as_completed(futures):
175226
item = futures[future]

src/cloudai/cli/handlers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def handle_install_and_uninstall(args: argparse.Namespace) -> int:
7373
logging.info("Not all components are ready")
7474
result = installer.install(installables)
7575
if result.success:
76-
logging.info(f"CloudAI is successful installed into '{system.install_path.absolute()}'.")
76+
logging.info(f"CloudAI is successfully installed into '{system.install_path.absolute()}'.")
7777
else:
7878
logging.error(result.message)
7979
rc = 1

src/cloudai/installer/kubernetes_installer.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,7 @@
2323

2424

2525
class KubernetesInstaller(BaseInstaller):
26-
"""
27-
Installer for systems that use Kubernetes.
28-
29-
Handles the installation of benchmarks or test templates for Kubernetes-based systems.
30-
"""
26+
"""Installer for Kubernetes systems."""
3127

3228
def _check_prerequisites(self) -> InstallStatusResult:
3329
"""

src/cloudai/installer/lsf_installer.py

Lines changed: 10 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -16,42 +16,30 @@
1616

1717
import logging
1818

19-
from cloudai import BaseInstaller, DockerImage, File, GitRepo, Installable, InstallStatusResult, PythonExecutable
19+
from cloudai import (
20+
BaseInstaller,
21+
DockerImage,
22+
File,
23+
GitRepo,
24+
Installable,
25+
InstallStatusResult,
26+
PythonExecutable,
27+
)
2028
from cloudai.installer.slurm_installer import SlurmInstaller
2129
from cloudai.systems import LSFSystem
2230

2331

2432
class LSFInstaller(BaseInstaller):
25-
"""
26-
Installer for systems that use the LSF scheduler.
27-
28-
Handles the installation of benchmarks or test templates for LSF-managed systems.
29-
30-
Attributes:
31-
PREREQUISITES (List[str]): A list of required binaries for the installer.
32-
install_path (Path): Path where the benchmarks are to be installed.
33-
"""
33+
"""Installer for LSF systems."""
3434

3535
PREREQUISITES = ("bsub", "bjobs", "bhosts", "lsid", "lsload")
3636

3737
def __init__(self, system: LSFSystem):
38-
"""
39-
Initialize the LSFInstaller with a system object.
40-
41-
Args:
42-
system (LSFSystem): The system schema object.
43-
"""
4438
super().__init__(system)
4539
self.system = system
4640

4741
@property
4842
def slurm_installer(self) -> SlurmInstaller:
49-
"""
50-
Lazily initialize and return a SlurmInstaller instance.
51-
52-
Returns:
53-
SlurmInstaller: The SlurmInstaller instance.
54-
"""
5543
if not hasattr(self, "_slurm_installer"):
5644
from cloudai.systems import SlurmSystem
5745

@@ -61,12 +49,6 @@ def slurm_installer(self) -> SlurmInstaller:
6149
return self._slurm_installer
6250

6351
def _check_prerequisites(self) -> InstallStatusResult:
64-
"""
65-
Check for the presence of required binaries, raising an error if any are missing.
66-
67-
Returns:
68-
InstallStatusResult: Result containing the status and any error message.
69-
"""
7052
base_prerequisites_result = super()._check_prerequisites()
7153
if not base_prerequisites_result.success:
7254
return InstallStatusResult(False, base_prerequisites_result.message)
@@ -78,21 +60,11 @@ def _check_prerequisites(self) -> InstallStatusResult:
7860
return InstallStatusResult(False, str(e))
7961

8062
def _check_required_binaries(self) -> None:
81-
"""Check for the presence of required binaries, raising an error if any are missing."""
8263
for binary in self.PREREQUISITES:
8364
if not self._is_binary_installed(binary):
8465
raise EnvironmentError(f"Required binary '{binary}' is not installed.")
8566

8667
def install_one(self, item: Installable) -> InstallStatusResult:
87-
"""
88-
Install a single item.
89-
90-
Args:
91-
item (Installable): The item to install.
92-
93-
Returns:
94-
InstallStatusResult: Result containing the installation status and error message if any.
95-
"""
9668
logging.debug(f"Attempt to install {item}")
9769

9870
if isinstance(item, DockerImage):
@@ -108,15 +80,6 @@ def install_one(self, item: Installable) -> InstallStatusResult:
10880
return InstallStatusResult(False, f"Unsupported item type: {type(item)}")
10981

11082
def uninstall_one(self, item: Installable) -> InstallStatusResult:
111-
"""
112-
Uninstall a single item.
113-
114-
Args:
115-
item (Installable): The item to uninstall.
116-
117-
Returns:
118-
InstallStatusResult: Result containing the uninstallation status and error message if any.
119-
"""
12083
logging.debug(f"Attempt to uninstall {item!r}")
12184
if isinstance(item, PythonExecutable):
12285
return self.slurm_installer._uninstall_python_executable(item)
@@ -128,15 +91,6 @@ def uninstall_one(self, item: Installable) -> InstallStatusResult:
12891
return InstallStatusResult(False, f"Unsupported item type: {type(item)}")
12992

13093
def is_installed_one(self, item: Installable) -> InstallStatusResult:
131-
"""
132-
Check if a single item is installed.
133-
134-
Args:
135-
item (Installable): The item to check.
136-
137-
Returns:
138-
InstallStatusResult: Result containing the installation status and error message if any.
139-
"""
14094
if isinstance(item, DockerImage):
14195
logging.info(f"Skipping installation check for Docker image {item} in LSF system.")
14296
return InstallStatusResult(True, "Docker image installation skipped for LSF system.")
@@ -150,15 +104,5 @@ def is_installed_one(self, item: Installable) -> InstallStatusResult:
150104
return InstallStatusResult(False, f"Unsupported item type: {type(item)}")
151105

152106
def mark_as_installed_one(self, item: Installable) -> InstallStatusResult:
153-
"""
154-
Mark a single item as installed.
155-
156-
Args:
157-
item (Installable): The item to mark as installed.
158-
159-
Returns:
160-
InstallStatusResult: Result containing the status and error message if any.
161-
"""
162107
logging.debug(f"Marking {item!r} as installed.")
163-
164108
return self.slurm_installer.mark_as_installed_one(item)

src/cloudai/installer/runai_installer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ class RunAIInstaller(BaseInstaller):
2424
"""Installer for RunAI systems."""
2525

2626
def __init__(self, system: RunAISystem):
27-
"""Initialize the RunAIInstaller with a system object."""
2827
super().__init__(system)
2928

3029
def _check_prerequisites(self) -> InstallStatusResult:

src/cloudai/installer/slurm_installer.py

Lines changed: 18 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -20,23 +20,21 @@
2020
from pathlib import Path
2121
from shutil import rmtree
2222

23-
from cloudai import BaseInstaller, DockerImage, File, GitRepo, Installable, InstallStatusResult, PythonExecutable
23+
from cloudai import (
24+
BaseInstaller,
25+
DockerImage,
26+
File,
27+
GitRepo,
28+
Installable,
29+
InstallStatusResult,
30+
PythonExecutable,
31+
)
2432
from cloudai.systems import SlurmSystem
2533
from cloudai.util.docker_image_cache_manager import DockerImageCacheManager, DockerImageCacheResult
2634

2735

2836
class SlurmInstaller(BaseInstaller):
29-
"""
30-
Installer for systems that use the Slurm scheduler.
31-
32-
Handles the installation of benchmarks or test templates for Slurm-managed systems.
33-
34-
Attributes
35-
PREREQUISITES (List[str]): A list of required binaries for the installer.
36-
REQUIRED_SRUN_OPTIONS (List[str]): A list of required srun options to check.
37-
install_path (Path): Path where the benchmarks are to be installed. This is optional since uninstallation does
38-
not require it.
39-
"""
37+
"""Installer for Slurm systems."""
4038

4139
PREREQUISITES = ("git", "sbatch", "sinfo", "squeue", "srun", "scancel", "sacct")
4240
REQUIRED_SRUN_OPTIONS = (
@@ -48,26 +46,11 @@ class SlurmInstaller(BaseInstaller):
4846
)
4947

5048
def __init__(self, system: SlurmSystem):
51-
"""
52-
Initialize the SlurmInstaller with a system object and an optional installation path.
53-
54-
Args:
55-
system (SlurmSystem): The system schema object.
56-
"""
5749
super().__init__(system)
5850
self.system = system
5951
self.docker_image_cache_manager = DockerImageCacheManager(system)
6052

6153
def _check_prerequisites(self) -> InstallStatusResult:
62-
"""
63-
Check for the presence of required binaries and specific srun options, raising an error if any are missing.
64-
65-
This ensures the system environment is properly set up before proceeding with the installation or uninstallation
66-
processes.
67-
68-
Returns
69-
InstallStatusResult: Result containing the status and any error message.
70-
"""
7154
base_prerequisites_result = super()._check_prerequisites()
7255
if not base_prerequisites_result.success:
7356
return InstallStatusResult(False, base_prerequisites_result.message)
@@ -80,7 +63,6 @@ def _check_prerequisites(self) -> InstallStatusResult:
8063
return InstallStatusResult(False, str(e))
8164

8265
def _check_required_binaries(self) -> None:
83-
"""Check for the presence of required binaries, raising an error if any are missing."""
8466
for binary in self.PREREQUISITES:
8567
if not self._is_binary_installed(binary):
8668
raise EnvironmentError(f"Required binary '{binary}' is not installed.")
@@ -103,15 +85,6 @@ def _check_srun_options(self) -> None:
10385
raise EnvironmentError(f"Required srun options missing: {missing_options_str}")
10486

10587
def install_one(self, item: Installable) -> InstallStatusResult:
106-
"""
107-
Install a single item.
108-
109-
Args:
110-
item (Installable): The item to install.
111-
112-
Returns:
113-
InstallStatusResult: Result containing the installation status and error message if any.
114-
"""
11588
logging.debug(f"Attempt to install {item}")
11689
if isinstance(item, DockerImage):
11790
res = self._install_docker_image(item)
@@ -128,15 +101,6 @@ def install_one(self, item: Installable) -> InstallStatusResult:
128101
return InstallStatusResult(False, f"Unsupported item type: {type(item)}")
129102

130103
def uninstall_one(self, item: Installable) -> InstallStatusResult:
131-
"""
132-
Uninstall a single item.
133-
134-
Args:
135-
item (Installable): The item to uninstall.
136-
137-
Returns:
138-
InstallStatusResult: Result containing the uninstallation status and error message if any.
139-
"""
140104
logging.debug(f"Attempt to uninstall {item!r}")
141105
if isinstance(item, DockerImage):
142106
res = self._uninstall_docker_image(item)
@@ -266,7 +230,14 @@ def _install_python_executable(self, item: PythonExecutable) -> InstallStatusRes
266230

267231
def _clone_repository(self, git_url: str, path: Path) -> InstallStatusResult:
268232
logging.debug(f"Cloning repository {git_url} into {path}")
269-
clone_cmd = ["git", "clone", git_url, str(path)]
233+
clone_cmd = ["git", "clone"]
234+
235+
if self.is_low_thread_environment:
236+
clone_cmd.extend(["-c", "pack.threads=4"])
237+
238+
clone_cmd.extend([git_url, str(path)])
239+
240+
logging.debug(f"Running git clone command: {' '.join(clone_cmd)}")
270241
result = subprocess.run(clone_cmd, capture_output=True, text=True)
271242
if result.returncode != 0:
272243
return InstallStatusResult(False, f"Failed to clone repository: {result.stderr}")

src/cloudai/installer/standalone_installer.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,11 @@
1818

1919

2020
class StandaloneInstaller(BaseInstaller):
21-
"""
22-
Installer for systems that do not use a scheduler (standalone systems).
23-
24-
Handles the installation of benchmarks or test templates for standalone systems.
25-
"""
21+
"""Installer for standalone systems."""
2622

2723
PREREQUISITES = ("ps", "kill")
2824

2925
def _check_prerequisites(self) -> InstallStatusResult:
30-
"""Check for the presence of required binaries, returning an error status if any are missing."""
3126
super()._check_prerequisites() # TODO: if fails, print out missing prerequisites
3227
missing_binaries = []
3328
for binary in self.PREREQUISITES:

0 commit comments

Comments
 (0)