Skip to content

Commit cd5ada8

Browse files
authored
Merge pull request #685 from NVIDIA/am/nixl
Update NIXL workloads
2 parents deb90dc + edd61a4 commit cd5ada8

File tree

15 files changed

+247
-314
lines changed

15 files changed

+247
-314
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
2+
# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
# SPDX-License-Identifier: Apache-2.0
4+
#
5+
# Licensed under the Apache License, Version 2.0 (the "License");
6+
# you may not use this file except in compliance with the License.
7+
# You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
from __future__ import annotations
17+
18+
import logging
19+
from functools import cache
20+
from pathlib import Path
21+
from typing import TYPE_CHECKING
22+
23+
from cloudai.systems.slurm import SlurmCommandGenStrategy
24+
from cloudai.util.lazy_imports import lazy
25+
26+
if TYPE_CHECKING:
27+
import pandas as pd
28+
29+
30+
class NIXLCmdGenBase(SlurmCommandGenStrategy):
31+
"""Base command generation strategy for NIXL-based workloads."""
32+
33+
@property
34+
def final_env_vars(self) -> dict[str, str | list[str]]:
35+
env_vars = super().final_env_vars
36+
env_vars["NIXL_ETCD_NAMESPACE"] = "/nixl/kvbench/$(uuidgen)"
37+
env_vars["NIXL_ETCD_ENDPOINTS"] = '"$SLURM_JOB_MASTER_NODE:2379"'
38+
env_vars["SLURM_JOB_MASTER_NODE"] = "$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1)"
39+
return env_vars
40+
41+
@final_env_vars.setter
42+
def final_env_vars(self, value: dict[str, str | list[str]]) -> None:
43+
super().final_env_vars = value
44+
45+
def gen_etcd_srun_command(self, etcd_path: str) -> list[str]:
46+
etcd_cmd = [
47+
etcd_path,
48+
"--listen-client-urls=http://0.0.0.0:2379",
49+
"--advertise-client-urls=http://$SLURM_JOB_MASTER_NODE:2379",
50+
"--listen-peer-urls=http://0.0.0.0:2380",
51+
"--initial-advertise-peer-urls=http://$SLURM_JOB_MASTER_NODE:2380",
52+
'--initial-cluster="default=http://$SLURM_JOB_MASTER_NODE:2380"',
53+
"--initial-cluster-state=new",
54+
]
55+
cmd = [
56+
*self.gen_srun_prefix(),
57+
f"--output={self.test_run.output_path.absolute() / 'etcd.log'}",
58+
"--overlap",
59+
"--ntasks-per-node=1",
60+
"--ntasks=1",
61+
"--nodelist=$SLURM_JOB_MASTER_NODE",
62+
"-N1",
63+
*etcd_cmd,
64+
" &",
65+
]
66+
return cmd
67+
68+
def gen_wait_for_etcd_command(self, timeout: int = 60) -> list[str]:
69+
cmd = [
70+
"timeout",
71+
str(timeout),
72+
"bash",
73+
"-c",
74+
'"until curl -s $NIXL_ETCD_ENDPOINTS/health > /dev/null 2>&1; do sleep 1; done" || {\n',
75+
f' echo "ETCD ($NIXL_ETCD_ENDPOINTS) was unreachable after {timeout} seconds";\n',
76+
" exit 1\n",
77+
"}",
78+
]
79+
return cmd
80+
81+
def gen_nixlbench_srun_commands(self, test_cmd: list[str], backend: str) -> list[list[str]]:
82+
prefix_part = self.gen_srun_prefix()
83+
bash_part = [
84+
"bash",
85+
"-c",
86+
f'"source {(self.test_run.output_path / "env_vars.sh").absolute()}; {" ".join(test_cmd)}"',
87+
]
88+
tpn_part = ["--ntasks-per-node=1", "--ntasks=1", "-N1"]
89+
90+
cmds = [
91+
[*prefix_part, "--overlap", "--nodelist=$SLURM_JOB_MASTER_NODE", *tpn_part, *bash_part],
92+
]
93+
94+
if backend.upper() == "UCX":
95+
nnodes, _ = self.get_cached_nodes_spec()
96+
if nnodes > 1:
97+
cmds = [
98+
[*prefix_part, "--overlap", f"--relative={idx}", *tpn_part, *bash_part] for idx in range(nnodes)
99+
]
100+
else:
101+
cmds *= max(2, nnodes)
102+
103+
return cmds
104+
105+
def create_env_vars_file(self) -> None:
106+
with (self.test_run.output_path / "env_vars.sh").open("w") as f:
107+
for key, value in self.final_env_vars.items():
108+
if key in {"NIXL_ETCD_ENDPOINTS", "NIXL_ETCD_NAMESPACE"}:
109+
continue
110+
if key == "SLURM_JOB_MASTER_NODE": # this is an sbatch-level variable, not needed per-node
111+
continue
112+
f.write(f"export {key}={value}\n")
113+
114+
115+
@cache
116+
def extract_nixlbench_data(stdout_file: Path) -> pd.DataFrame:
117+
if not stdout_file.exists():
118+
logging.debug(f"{stdout_file} not found")
119+
return lazy.pd.DataFrame()
120+
121+
header_present, data = False, []
122+
for line in stdout_file.read_text().splitlines():
123+
if not header_present and (
124+
"Block Size (B) Batch Size " in line and "Avg Lat. (us)" in line and "B/W (GB/Sec)" in line
125+
):
126+
header_present = True
127+
continue
128+
parts = line.split()
129+
if header_present and (len(parts) == 6 or len(parts) == 10):
130+
if len(parts) == 6:
131+
data.append([parts[0], parts[1], parts[2], parts[-1]])
132+
else:
133+
data.append([parts[0], parts[1], parts[3], parts[2]])
134+
135+
df = lazy.pd.DataFrame(data, columns=["block_size", "batch_size", "avg_lat", "bw_gb_sec"])
136+
df["block_size"] = df["block_size"].astype(int)
137+
df["batch_size"] = df["batch_size"].astype(int)
138+
df["avg_lat"] = df["avg_lat"].astype(float)
139+
df["bw_gb_sec"] = df["bw_gb_sec"].astype(float)
140+
141+
return df

src/cloudai/workloads/nixl_bench/nixl_bench.py

Lines changed: 9 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -16,34 +16,25 @@
1616

1717
from __future__ import annotations
1818

19-
import logging
20-
from functools import cache
21-
from pathlib import Path
22-
from typing import TYPE_CHECKING, Optional
23-
2419
from cloudai.core import DockerImage, Installable, JobStatusResult, TestRun
2520
from cloudai.models.workload import CmdArgs, TestDefinition
26-
from cloudai.util.lazy_imports import lazy
27-
28-
if TYPE_CHECKING:
29-
import pandas as pd
21+
from cloudai.workloads.common.nixl import extract_nixlbench_data
3022

3123

3224
class NIXLBenchCmdArgs(CmdArgs):
3325
"""Command line arguments for a NIXL Bench test."""
3426

3527
docker_image_url: str
36-
etcd_endpoint: str
3728
path_to_benchmark: str
29+
etcd_path: str = "etcd"
30+
etcd_endpoints: str = "http://$NIXL_ETCD_ENDPOINTS"
3831

3932

4033
class NIXLBenchTestDefinition(TestDefinition):
4134
"""Test definition for a NIXL Bench test."""
4235

4336
cmd_args: NIXLBenchCmdArgs
44-
etcd_image_url: str
45-
_nixl_image: Optional[DockerImage] = None
46-
_etcd_image: Optional[DockerImage] = None
37+
_nixl_image: DockerImage | None = None
4738

4839
@property
4940
def docker_image(self) -> DockerImage:
@@ -52,47 +43,16 @@ def docker_image(self) -> DockerImage:
5243
return self._nixl_image
5344

5445
@property
55-
def etcd_image(self) -> DockerImage:
56-
if not self._etcd_image:
57-
self._etcd_image = DockerImage(url=self.etcd_image_url)
58-
return self._etcd_image
46+
def installables(self) -> list[Installable]:
47+
return [self.docker_image, *self.git_repos]
5948

6049
@property
61-
def installables(self) -> list[Installable]:
62-
return [self.docker_image, *self.git_repos, self.etcd_image]
50+
def cmd_args_dict(self) -> dict[str, str | list[str]]:
51+
return self.cmd_args.model_dump(exclude={"docker_image_url", "path_to_benchmark", "cmd_args", "etcd_path"})
6352

6453
def was_run_successful(self, tr: TestRun) -> JobStatusResult:
65-
df = extract_nixl_data(tr.output_path / "stdout.txt")
54+
df = extract_nixlbench_data(tr.output_path / "stdout.txt")
6655
if df.empty:
6756
return JobStatusResult(is_successful=False, error_message=f"NIXLBench data not found in {tr.output_path}.")
6857

6958
return JobStatusResult(is_successful=True)
70-
71-
72-
@cache
73-
def extract_nixl_data(stdout_file: Path) -> pd.DataFrame:
74-
if not stdout_file.exists():
75-
logging.debug(f"{stdout_file} not found")
76-
return lazy.pd.DataFrame()
77-
78-
header_present, data = False, []
79-
for line in stdout_file.read_text().splitlines():
80-
if not header_present and (
81-
"Block Size (B) Batch Size " in line and "Avg Lat. (us)" in line and "B/W (GB/Sec)" in line
82-
):
83-
header_present = True
84-
continue
85-
parts = line.split()
86-
if header_present and (len(parts) == 6 or len(parts) == 10):
87-
if len(parts) == 6:
88-
data.append([parts[0], parts[1], parts[2], parts[-1]])
89-
else:
90-
data.append([parts[0], parts[1], parts[3], parts[2]])
91-
92-
df = lazy.pd.DataFrame(data, columns=["block_size", "batch_size", "avg_lat", "bw_gb_sec"])
93-
df["block_size"] = df["block_size"].astype(int)
94-
df["batch_size"] = df["batch_size"].astype(int)
95-
df["avg_lat"] = df["avg_lat"].astype(float)
96-
df["bw_gb_sec"] = df["bw_gb_sec"].astype(float)
97-
98-
return df

src/cloudai/workloads/nixl_bench/report_generation_strategy.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from cloudai.core import METRIC_ERROR, ReportGenerationStrategy
2424
from cloudai.report_generator.tool.bokeh_report_tool import BokehReportTool
2525
from cloudai.util.lazy_imports import lazy
26-
from cloudai.workloads.nixl_bench.nixl_bench import extract_nixl_data
26+
from cloudai.workloads.common.nixl import extract_nixlbench_data
2727

2828

2929
class NIXLBenchReportGenerationStrategy(ReportGenerationStrategy):
@@ -36,27 +36,27 @@ def results_file(self) -> Path:
3636
return self.test_run.output_path / "stdout.txt"
3737

3838
def can_handle_directory(self) -> bool:
39-
df = extract_nixl_data(self.results_file)
39+
df = extract_nixlbench_data(self.results_file)
4040
return not df.empty
4141

4242
def generate_report(self) -> None:
4343
if not self.can_handle_directory():
4444
return
4545

4646
self.generate_bokeh_report()
47-
df = extract_nixl_data(self.results_file)
47+
df = extract_nixlbench_data(self.results_file)
4848
df.to_csv(self.test_run.output_path / "nixlbench.csv", index=False)
4949

5050
def get_metric(self, metric: str) -> float:
5151
logging.debug(f"Getting metric {metric} from {self.results_file.absolute()}")
52-
df = extract_nixl_data(self.results_file)
52+
df = extract_nixlbench_data(self.results_file)
5353
if df.empty or metric not in {"default", "latency"}:
5454
return METRIC_ERROR
5555

5656
return float(lazy.np.mean(df["avg_lat"]))
5757

5858
def generate_bokeh_report(self) -> None:
59-
df = extract_nixl_data(self.results_file)
59+
df = extract_nixlbench_data(self.results_file)
6060

6161
report_tool = BokehReportTool(self.test_run.output_path)
6262
p = report_tool.add_log_x_linear_y_multi_line_plot(

src/cloudai/workloads/nixl_bench/slurm_command_gen_strategy.py

Lines changed: 19 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717
from typing import cast
1818

1919
from cloudai.core import TestRun
20-
from cloudai.systems.slurm import SlurmCommandGenStrategy, SlurmSystem
20+
from cloudai.systems.slurm import SlurmSystem
21+
from cloudai.workloads.common.nixl import NIXLCmdGenBase
2122

2223
from .nixl_bench import NIXLBenchTestDefinition
2324

2425

25-
class NIXLBenchSlurmCommandGenStrategy(SlurmCommandGenStrategy):
26+
class NIXLBenchSlurmCommandGenStrategy(NIXLCmdGenBase):
2627
"""Command generation strategy for NIXL Bench tests."""
2728

2829
def __init__(self, system: SlurmSystem, test_run: TestRun) -> None:
@@ -36,88 +37,37 @@ def image_path(self) -> str | None:
3637
def _container_mounts(self) -> list[str]:
3738
return []
3839

40+
@property
41+
def tdef(self) -> NIXLBenchTestDefinition:
42+
return cast(NIXLBenchTestDefinition, self.test_run.test.test_definition)
43+
3944
def _gen_srun_command(self) -> str:
40-
with (self.test_run.output_path / "env_vars.sh").open("w") as f:
41-
for key, value in self.final_env_vars.items():
42-
if key == "SLURM_JOB_MASTER_NODE": # this is an sbatch-level variable, not needed per-node
43-
continue
44-
f.write(f"export {key}={value}\n")
45+
self.create_env_vars_file()
4546

46-
etcd_command: list[str] = self.gen_etcd_srun_command()
47-
nixl_commands = self.gen_nixl_srun_commands()
47+
self._current_image_url = str(self.tdef.docker_image.installed_path)
48+
etcd_command: list[str] = self.gen_etcd_srun_command(self.tdef.cmd_args.etcd_path)
49+
nixl_commands = self.gen_nixlbench_srun_commands(
50+
self.gen_nixlbench_command(), str(self.tdef.cmd_args_dict.get("backend", "unset"))
51+
)
52+
self._current_image_url = None
4853

4954
commands: list[str] = [
5055
" ".join(etcd_command),
5156
"etcd_pid=$!",
52-
"sleep 5",
57+
" ".join(self.gen_wait_for_etcd_command()),
5358
*[" ".join(cmd) + " &\nsleep 15" for cmd in nixl_commands[:-1]],
5459
" ".join(nixl_commands[-1]),
5560
"kill -9 $etcd_pid",
5661
]
5762
return "\n".join(commands)
5863

59-
def gen_etcd_srun_command(self) -> list[str]:
60-
tdef: NIXLBenchTestDefinition = cast(NIXLBenchTestDefinition, self.test_run.test.test_definition)
61-
self._current_image_url = str(tdef.etcd_image.installed_path)
62-
etcd_cmd = [
63-
"/usr/local/bin/etcd",
64-
"--listen-client-urls",
65-
"http://0.0.0.0:2379",
66-
"--advertise-client-urls",
67-
"http://$(hostname -I | awk '{print $1}'):2379",
68-
]
69-
cmd = [
70-
*self.gen_srun_prefix(),
71-
"--overlap",
72-
"--ntasks-per-node=1",
73-
"--ntasks=1",
74-
"--nodelist=$SLURM_JOB_MASTER_NODE",
75-
"-N1",
76-
"bash",
77-
"-c",
78-
f'"{" ".join(etcd_cmd)}" &',
79-
]
80-
self._current_image_url = None
81-
return cmd
82-
8364
def gen_nixlbench_command(self) -> list[str]:
8465
tdef: NIXLBenchTestDefinition = cast(NIXLBenchTestDefinition, self.test_run.test.test_definition)
85-
cmd = [tdef.cmd_args.path_to_benchmark, f"--etcd-endpoints {tdef.cmd_args.etcd_endpoint}"]
66+
cmd = [tdef.cmd_args.path_to_benchmark]
8667

87-
other_args = tdef.cmd_args.model_dump(
88-
exclude={"docker_image_url", "etcd_endpoint", "path_to_benchmark", "cmd_args"}
89-
)
90-
for k, v in other_args.items():
68+
for k, v in tdef.cmd_args_dict.items():
69+
if k == "etcd_endpoints":
70+
k = "etcd-endpoints"
9171
cmd.append(f"--{k} {v}")
9272

9373
return cmd
94-
95-
def gen_nixl_srun_commands(self) -> list[list[str]]:
96-
tdef: NIXLBenchTestDefinition = cast(NIXLBenchTestDefinition, self.test_run.test.test_definition)
97-
self._current_image_url = str(tdef.docker_image.installed_path)
98-
prefix_part = self.gen_srun_prefix()
99-
self._current_image_url = None
100-
101-
bash_part = [
102-
"bash",
103-
"-c",
104-
f'"source {(self.test_run.output_path / "env_vars.sh").absolute()}; '
105-
f'{" ".join(self.gen_nixlbench_command())}"',
106-
]
107-
tpn_part = ["--ntasks-per-node=1", "--ntasks=1", "-N1"]
108-
109-
cmds = [
110-
[*prefix_part, "--overlap", "--nodelist=$SLURM_JOB_MASTER_NODE", *tpn_part, *bash_part],
111-
]
112-
113-
backend = str(tdef.cmd_args_dict.get("backend", "unset")).upper()
114-
if backend == "UCX":
115-
nnodes, _ = self.get_cached_nodes_spec()
116-
if nnodes > 1:
117-
cmds = [
118-
[*prefix_part, "--overlap", f"--relative={idx}", *tpn_part, *bash_part] for idx in range(nnodes)
119-
]
120-
else:
121-
cmds *= max(2, nnodes)
122-
123-
return cmds

0 commit comments

Comments
 (0)