Skip to content

Commit 1254085

Browse files
authored
Use mock data by default in NeMo launcher (#224)
* Use mock data by default in NeMo * Validate NeMo launcher arguments * Make ruff happy * Remove data_dir argument when data_impl==mock
1 parent a7b4e0f commit 1254085

File tree

3 files changed

+204
-89
lines changed

3 files changed

+204
-89
lines changed

src/cloudai/schema/test_template/nemo_launcher/slurm_command_gen_strategy.py

Lines changed: 138 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -34,105 +34,154 @@ def gen_exec_command(
3434
num_nodes: int,
3535
nodes: List[str],
3636
) -> str:
37-
final_env_vars = self._override_env_vars(self.system.global_env_vars, extra_env_vars)
37+
self._prepare_environment(cmd_args, extra_env_vars, output_path)
3838

39-
launcher_path = (
40-
self.system.install_path
41-
/ NeMoLauncherSlurmInstallStrategy.SUBDIR_PATH
42-
/ NeMoLauncherSlurmInstallStrategy.REPOSITORY_NAME
43-
).absolute()
39+
nodes = self.slurm_system.parse_nodes(nodes)
40+
self._set_node_config(nodes, num_nodes)
41+
42+
self.final_cmd_args["container"] = self.docker_image_cache_manager.ensure_docker_image(
43+
self.docker_image_url,
44+
NeMoLauncherSlurmInstallStrategy.SUBDIR_PATH,
45+
NeMoLauncherSlurmInstallStrategy.DOCKER_IMAGE_FILENAME,
46+
).docker_image_path
47+
48+
for key in ("repository_url", "repository_commit_hash", "docker_image_url"):
49+
self.final_cmd_args.pop(key, None)
50+
51+
if self.slurm_system.account:
52+
self.final_cmd_args.update(
53+
{
54+
"cluster.account": self.slurm_system.account,
55+
"cluster.job_name_prefix": f"{self.slurm_system.account}-cloudai.nemo:",
56+
}
57+
)
58+
self.final_cmd_args["cluster.gpus_per_node"] = self.slurm_system.gpus_per_node or "null"
59+
60+
self._validate_data_config()
61+
62+
if self.final_cmd_args.get("training.model.data.data_impl") == "mock":
63+
self.final_cmd_args.pop("data_dir", None)
64+
65+
cmd_args_str = self._generate_cmd_args_str(self.final_cmd_args, nodes)
66+
67+
full_cmd = f"python {self._launcher_scripts_path()}/launcher_scripts/main.py {cmd_args_str}"
68+
69+
if extra_cmd_args:
70+
full_cmd = self._handle_extra_cmd_args(full_cmd, extra_cmd_args)
71+
72+
env_vars_str = " ".join(f"{key}={value}" for key, value in self.final_env_vars.items())
73+
full_cmd = f"{env_vars_str} {full_cmd}" if env_vars_str else full_cmd
74+
75+
return full_cmd.strip()
76+
77+
def _prepare_environment(self, cmd_args: Dict[str, str], extra_env_vars: Dict[str, str], output_path: Path) -> None:
78+
"""
79+
Prepare the environment variables and command arguments.
80+
81+
Args:
82+
cmd_args (Dict[str, str]): Command-line arguments for the launcher.
83+
extra_env_vars (Dict[str, str]): Additional environment variables.
84+
output_path (Path): Path to the output directory.
85+
"""
86+
self.final_env_vars = self._override_env_vars(self.system.global_env_vars, extra_env_vars)
87+
88+
launcher_path = self._launcher_scripts_path()
4489
output_path_abs = output_path.absolute()
4590
overriden_cmd_args = self._override_cmd_args(self.default_cmd_args, cmd_args)
4691
self.final_cmd_args = {
4792
k: self._handle_special_keys(k, v, str(launcher_path), str(output_path_abs))
4893
for k, v in overriden_cmd_args.items()
4994
}
50-
self.final_cmd_args["base_results_dir"] = str(output_path_abs)
51-
self.final_cmd_args["training.model.data.index_mapping_dir"] = str(output_path_abs)
52-
self.final_cmd_args["launcher_scripts_path"] = str(launcher_path / "launcher_scripts")
95+
self.final_cmd_args.update(
96+
{
97+
"base_results_dir": str(output_path_abs),
98+
"training.model.data.index_mapping_dir": str(output_path_abs),
99+
"launcher_scripts_path": str(launcher_path / "launcher_scripts"),
100+
}
101+
)
53102

54-
for key, value in final_env_vars.items():
103+
for key, value in self.final_env_vars.items():
55104
self.final_cmd_args[f"env_vars.{key}"] = value
56105

57106
if "training.values" in self.final_cmd_args:
58107
self.final_cmd_args["training"] = self.final_cmd_args.pop("training.values")
59108

60109
self.final_cmd_args["cluster.partition"] = self.slurm_system.default_partition
110+
self._handle_reservation()
111+
112+
def _handle_reservation(self) -> None:
113+
"""Handle Slurm reservation if provided."""
61114
reservation_key = "--reservation "
62115
if self.slurm_system.extra_srun_args and reservation_key in self.slurm_system.extra_srun_args:
63116
reservation = self.slurm_system.extra_srun_args.split(reservation_key, 1)[1].split(" ", 1)[0]
64117
self.final_cmd_args["+cluster.reservation"] = reservation
65118

66-
nodes = self.slurm_system.parse_nodes(nodes)
67-
if nodes:
68-
self.final_cmd_args["training.trainer.num_nodes"] = str(len(nodes))
69-
else:
70-
self.final_cmd_args["training.trainer.num_nodes"] = num_nodes
71-
72-
self.final_cmd_args["container"] = self.docker_image_cache_manager.ensure_docker_image(
73-
self.docker_image_url,
74-
NeMoLauncherSlurmInstallStrategy.SUBDIR_PATH,
75-
NeMoLauncherSlurmInstallStrategy.DOCKER_IMAGE_FILENAME,
76-
).docker_image_path
77-
78-
del self.final_cmd_args["repository_url"]
79-
del self.final_cmd_args["repository_commit_hash"]
80-
del self.final_cmd_args["docker_image_url"]
81-
82-
if self.slurm_system.account is not None:
83-
self.final_cmd_args["cluster.account"] = self.slurm_system.account
84-
self.final_cmd_args["cluster.job_name_prefix"] = f"{self.slurm_system.account}-cloudai.nemo:"
85-
self.final_cmd_args["cluster.gpus_per_node"] = (
86-
self.slurm_system.gpus_per_node if self.slurm_system.gpus_per_node is not None else "null"
87-
)
88-
89-
if ("data_dir" in self.final_cmd_args) and (self.final_cmd_args["data_dir"] == "DATA_DIR"):
90-
raise ValueError(
91-
"The 'data_dir' field of the NeMo launcher test contains the placeholder 'DATA_DIR'. "
92-
"Please update the test schema TOML file with a valid path to the dataset. "
93-
"The 'data_dir' field must point to an actual dataset location, not a placeholder."
94-
)
95-
96-
cmd_args_str = self._generate_cmd_args_str(self.final_cmd_args, nodes)
97-
98-
full_cmd = f"python {launcher_path}/launcher_scripts/main.py {cmd_args_str}"
119+
def _launcher_scripts_path(self) -> Path:
120+
"""
121+
Return the launcher scripts path.
99122
100-
if extra_cmd_args:
101-
full_cmd += f" {extra_cmd_args}"
102-
tokenizer_key = "training.model.tokenizer.model="
103-
if tokenizer_key in extra_cmd_args:
104-
tokenizer_path = extra_cmd_args.split(tokenizer_key, 1)[1].split(" ", 1)[0]
105-
if not Path(tokenizer_path).is_file():
106-
raise ValueError(
107-
f"The provided tokenizer path '{tokenizer_path}' is not valid. "
108-
"Please review the test schema file to ensure the tokenizer path is correct. "
109-
"If it contains a placeholder value, refer to USER_GUIDE.md to download the tokenizer "
110-
"and update the schema file accordingly."
111-
)
112-
full_cmd += f" container_mounts=[{tokenizer_path}:{tokenizer_path}]"
113-
114-
env_vars_str = " ".join(f"{key}={value}" for key, value in final_env_vars.items())
115-
full_cmd = f"{env_vars_str} {full_cmd}" if env_vars_str else full_cmd
123+
Returns
124+
Path: Absolute path to the NeMo launcher scripts directory.
125+
"""
126+
return (
127+
self.system.install_path
128+
/ NeMoLauncherSlurmInstallStrategy.SUBDIR_PATH
129+
/ NeMoLauncherSlurmInstallStrategy.REPOSITORY_NAME
130+
).absolute()
116131

117-
return full_cmd.strip()
132+
def _set_node_config(self, nodes: List[str], num_nodes: int) -> None:
133+
"""
134+
Set the number of nodes configuration.
118135
119-
def _handle_special_keys(self, key: str, value: Any, launcher_path: str, output_path: str) -> Any:
136+
Args:
137+
nodes (List[str]): List of nodes where the test will run.
138+
num_nodes (int): Number of nodes to allocate if no specific node list is provided.
120139
"""
121-
Handle special formatting for specific keys.
140+
self.final_cmd_args["training.trainer.num_nodes"] = str(len(nodes)) if nodes else num_nodes
141+
142+
def _validate_data_config(self) -> None:
143+
"""Validate the data directory and prefix configuration for non-mock environments."""
144+
if self.final_cmd_args.get("training.model.data.data_impl") != "mock":
145+
data_dir = self.final_cmd_args.get("data_dir")
146+
data_prefix = self.final_cmd_args.get("training.model.data.data_prefix")
147+
148+
if not data_dir or data_dir == "~":
149+
raise ValueError(
150+
"The 'data_dir' field of the NeMo launcher test contains an invalid placeholder '~'. "
151+
"Please provide a valid path to the dataset in the test schema TOML file. "
152+
"The 'data_dir' field must point to an actual dataset location."
153+
)
154+
155+
if data_prefix == "[]":
156+
raise ValueError(
157+
"The 'data_prefix' field of the NeMo launcher test is missing or empty. "
158+
"Please update the test schema TOML file with a valid prefix for the training datasets."
159+
)
160+
161+
def _handle_extra_cmd_args(self, full_cmd: str, extra_cmd_args: str) -> str:
162+
"""
163+
Handle additional command arguments such as the tokenizer path.
122164
123165
Args:
124-
key (str): The argument key.
125-
value (Any): The argument value.
126-
launcher_path (str): The base path for NeMo Megatron launcher.
127-
output_path (str): Path to the output directory.
166+
full_cmd (str): The full command string generated so far.
167+
extra_cmd_args (str): Additional command-line arguments to append.
128168
129169
Returns:
130-
Any: The specially formatted value, if applicable.
170+
str: Updated command string with the additional arguments.
131171
"""
132-
if key == "training.model.data.data_prefix":
133-
return value.replace("\\", "")
134-
135-
return value
172+
full_cmd += f" {extra_cmd_args}"
173+
tokenizer_key = "training.model.tokenizer.model="
174+
if tokenizer_key in extra_cmd_args:
175+
tokenizer_path = extra_cmd_args.split(tokenizer_key, 1)[1].split(" ", 1)[0]
176+
if not Path(tokenizer_path).is_file():
177+
raise ValueError(
178+
f"The provided tokenizer path '{tokenizer_path}' is not valid. "
179+
"Please review the test schema file to ensure the tokenizer path is correct. "
180+
"If it contains a placeholder value, refer to USER_GUIDE.md to download the tokenizer "
181+
"and update the schema file accordingly."
182+
)
183+
full_cmd += f" container_mounts=[{tokenizer_path}:{tokenizer_path}]"
184+
return full_cmd
136185

137186
def _generate_cmd_args_str(self, args: Dict[str, str], nodes: List[str]) -> str:
138187
"""
@@ -164,3 +213,21 @@ def _generate_cmd_args_str(self, args: Dict[str, str], nodes: List[str]) -> str:
164213
cmd_arg_str_parts.append(f"+cluster.nodelist=\\'{nodes_str}\\'")
165214

166215
return " ".join(cmd_arg_str_parts + env_var_str_parts)
216+
217+
def _handle_special_keys(self, key: str, value: Any, launcher_path: str, output_path: str) -> Any:
218+
"""
219+
Handle special formatting for specific keys.
220+
221+
Args:
222+
key (str): The argument key.
223+
value (Any): The argument value.
224+
launcher_path (str): The base path for NeMo Megatron launcher.
225+
output_path (str): Path to the output directory.
226+
227+
Returns:
228+
Any: The specially formatted value, if applicable.
229+
"""
230+
if key == "training.model.data.data_prefix":
231+
return value.replace("\\", "")
232+
233+
return value

src/cloudai/test_definitions/nemo_launcher.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ class TrainingModelData(BaseModel):
5656
"""Training model data configuration."""
5757

5858
model_config = ConfigDict(extra="forbid")
59-
data_prefix: str = "[\"1.0\",'${data_dir}/my-gpt3_00_text_document']"
59+
data_prefix: str = "[]"
60+
data_impl: str = "mock"
6061

6162

6263
class TrainingModel(BaseModel):
@@ -96,7 +97,7 @@ class NeMoLauncherCmdArgs(CmdArgs):
9697
repository_commit_hash: str = "cf411a9ede3b466677df8ee672bcc6c396e71e1a"
9798
docker_image_url: str = "nvcr.io/nvidian/nemofw-training:24.01.01"
9899
stages: str = '["training"]'
99-
data_dir: str = "DATA_DIR"
100+
data_dir: str = "~"
100101
numa_mapping: NumaMapping = Field(default_factory=NumaMapping)
101102
cluster: Cluster = Field(default_factory=Cluster)
102103
training: Training = Field(default_factory=Training)

0 commit comments

Comments
 (0)