1717import logging
1818import os
1919import subprocess
20- from concurrent .futures import ThreadPoolExecutor , as_completed
2120from pathlib import Path
22- from typing import Any , Dict , List
21+ from typing import Any , Dict
2322
2423from cloudai import InstallStatusResult , System
25- from cloudai .systems .slurm import SlurmNodeState
2624from cloudai .systems .slurm .strategy import SlurmInstallStrategy
2725
2826
29- class DatasetCheckResult :
30- """
31- Result class for dataset check on Slurm nodes.
32-
33- Attributes
34- success (bool): Whether the datasets are present on all nodes.
35- nodes_without_datasets (List[str]): List of nodes missing one or more datasets.
36- """
37-
38- def __init__ (self , success : bool , nodes_without_datasets : List [str ]) -> None :
39- self .success = success
40- self .nodes_without_datasets = nodes_without_datasets
41-
42-
4327class NeMoLauncherSlurmInstallStrategy (SlurmInstallStrategy ):
4428 """
4529 Install strategy for NeMo-Launcher on Slurm systems.
@@ -73,21 +57,7 @@ def is_installed(self) -> InstallStatusResult:
7357 self .docker_image_url , self .SUBDIR_PATH , self .DOCKER_IMAGE_FILENAME
7458 ).success
7559
76- data_dir_path = Path (self .default_cmd_args ["data_dir" ])
77- datasets_check_result = self ._check_datasets_on_nodes (data_dir_path )
78- if not datasets_check_result .success :
79- return InstallStatusResult (
80- success = False ,
81- message = (
82- "NeMo datasets are not installed on some nodes. "
83- f"Nodes without datasets: { ', ' .join (datasets_check_result .nodes_without_datasets )} . "
84- f"Please ensure that the NeMo datasets are manually installed on each node in the specified "
85- f"data directory: { data_dir_path } . This directory should contain all necessary datasets for "
86- f"NeMo Launcher to function properly."
87- ),
88- )
89-
90- if repo_installed and docker_image_installed and datasets_check_result .success :
60+ if repo_installed and docker_image_installed :
9161 return InstallStatusResult (success = True )
9262 else :
9363 missing_components = []
@@ -99,8 +69,7 @@ def is_installed(self) -> InstallStatusResult:
9969 if not docker_image_installed :
10070 docker_image_path = subdir_path / self .DOCKER_IMAGE_FILENAME
10171 missing_components .append (f"Docker image at { docker_image_path } from URL { self .docker_image_url } " )
102- if not datasets_check_result .success :
103- missing_components .append (f"Datasets in { data_dir_path } on some nodes" )
72+
10473 return InstallStatusResult (
10574 success = False ,
10675 message = "The following components are missing:\n "
@@ -120,19 +89,6 @@ def install(self) -> InstallStatusResult:
12089 subdir_path = self .system .install_path / self .SUBDIR_PATH
12190 subdir_path .mkdir (parents = True , exist_ok = True )
12291
123- data_dir_path = Path (self .default_cmd_args ["data_dir" ])
124- datasets_check_result = self ._check_datasets_on_nodes (data_dir_path )
125- if not datasets_check_result .success :
126- return InstallStatusResult (
127- success = False ,
128- message = (
129- "Some nodes do not have the NeMo-Launcher datasets installed. "
130- f"Nodes without datasets: { ', ' .join (datasets_check_result .nodes_without_datasets )} . "
131- f"Datasets directory: { data_dir_path } . "
132- "Please ensure that datasets are installed on all nodes."
133- ),
134- )
135-
13692 try :
13793 self ._clone_repository (subdir_path )
13894 self ._install_requirements (subdir_path )
@@ -176,77 +132,6 @@ def _check_install_path_access(self):
176132 if not self .system .install_path .is_dir () or not os .access (self .system .install_path , os .W_OK ):
177133 raise PermissionError (f"No permission to write in install path { self .system .install_path } ." )
178134
179- def _check_datasets_on_nodes (self , data_dir_path : Path ) -> DatasetCheckResult :
180- """
181- Verify the presence of specified dataset files and directories on all idle compute nodes.
182-
183- Default partition is used.
184-
185- This method uses parallel execution to check datasets on multiple nodes simultaneously, improving efficiency
186- for systems with multiple nodes.
187-
188- Args:
189- data_dir_path (Path): Path where dataset files and directories are stored.
190-
191- Returns:
192- DatasetCheckResult: Result object containing success status and nodes without datasets.
193- """
194- partition_nodes = self .slurm_system .get_partition_nodes (self .slurm_system .default_partition )
195-
196- idle_nodes = [node .name for node in partition_nodes if node .state == SlurmNodeState .IDLE ]
197-
198- if not idle_nodes :
199- logging .warning (
200- "There are no idle nodes in the default partition to check. "
201- "Skipping NeMo-Launcher dataset verification."
202- )
203- return DatasetCheckResult (success = True , nodes_without_datasets = [])
204-
205- nodes_without_datasets = []
206- with ThreadPoolExecutor (max_workers = len (idle_nodes )) as executor :
207- futures = {
208- executor .submit (
209- self ._check_dataset_on_node ,
210- node ,
211- data_dir_path ,
212- [
213- "bpe" ,
214- "my-gpt3_00_text_document.bin" ,
215- "my-gpt3_00_text_document.idx" ,
216- ],
217- ): node
218- for node in idle_nodes
219- }
220- for future in as_completed (futures ):
221- node = futures [future ]
222- if not future .result ():
223- nodes_without_datasets .append (node )
224-
225- return DatasetCheckResult (success = not nodes_without_datasets , nodes_without_datasets = nodes_without_datasets )
226-
227- def _check_dataset_on_node (self , node : str , data_dir_path : Path , dataset_items : List [str ]) -> bool :
228- """
229- Check if dataset files and directories exist on a single compute node.
230-
231- Args:
232- node (str): The name of the compute node.
233- data_dir_path (Path): Path to the data directory.
234- dataset_items (List[str]): List of dataset file and directory names to check.
235-
236- Returns:
237- bool: True if all dataset files and directories exist on the node, False otherwise.
238- """
239- python_check_script = (
240- f"import os;print(all(Path('{ data_dir_path } ') / item).exists() for item in { dataset_items } )"
241- )
242- cmd = (
243- f"srun --nodes=1 --nodelist={ node } "
244- f"--partition={ self .slurm_system .default_partition } "
245- f'python -c "{ python_check_script } "'
246- )
247- result = subprocess .run (cmd , shell = True , check = False , capture_output = True , text = True )
248- return result .returncode == 0 and result .stdout .strip () == "True"
249-
250135 def _clone_repository (self , subdir_path : Path ) -> None :
251136 """
252137 Clones NeMo-Launcher repository into specified path if it does not already exist.
0 commit comments