Skip to content

Commit edd61a4

Browse files
committed
Move more functions into a common module
1 parent 113ca8d commit edd61a4

File tree

6 files changed

+50
-51
lines changed

6 files changed

+50
-51
lines changed

src/cloudai/workloads/common/nixl.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,18 @@
1313
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
16+
from __future__ import annotations
1617

18+
import logging
19+
from functools import cache
20+
from pathlib import Path
21+
from typing import TYPE_CHECKING
1722

1823
from cloudai.systems.slurm import SlurmCommandGenStrategy
24+
from cloudai.util.lazy_imports import lazy
25+
26+
if TYPE_CHECKING:
27+
import pandas as pd
1928

2029

2130
class NIXLCmdGenBase(SlurmCommandGenStrategy):
@@ -101,3 +110,32 @@ def create_env_vars_file(self) -> None:
101110
if key == "SLURM_JOB_MASTER_NODE": # this is an sbatch-level variable, not needed per-node
102111
continue
103112
f.write(f"export {key}={value}\n")
113+
114+
115+
@cache
116+
def extract_nixlbench_data(stdout_file: Path) -> pd.DataFrame:
117+
if not stdout_file.exists():
118+
logging.debug(f"{stdout_file} not found")
119+
return lazy.pd.DataFrame()
120+
121+
header_present, data = False, []
122+
for line in stdout_file.read_text().splitlines():
123+
if not header_present and (
124+
"Block Size (B) Batch Size " in line and "Avg Lat. (us)" in line and "B/W (GB/Sec)" in line
125+
):
126+
header_present = True
127+
continue
128+
parts = line.split()
129+
if header_present and (len(parts) == 6 or len(parts) == 10):
130+
if len(parts) == 6:
131+
data.append([parts[0], parts[1], parts[2], parts[-1]])
132+
else:
133+
data.append([parts[0], parts[1], parts[3], parts[2]])
134+
135+
df = lazy.pd.DataFrame(data, columns=["block_size", "batch_size", "avg_lat", "bw_gb_sec"])
136+
df["block_size"] = df["block_size"].astype(int)
137+
df["batch_size"] = df["batch_size"].astype(int)
138+
df["avg_lat"] = df["avg_lat"].astype(float)
139+
df["bw_gb_sec"] = df["bw_gb_sec"].astype(float)
140+
141+
return df

src/cloudai/workloads/nixl_bench/nixl_bench.py

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,9 @@
1616

1717
from __future__ import annotations
1818

19-
import logging
20-
from functools import cache
21-
from pathlib import Path
22-
from typing import TYPE_CHECKING, Optional
23-
2419
from cloudai.core import DockerImage, Installable, JobStatusResult, TestRun
2520
from cloudai.models.workload import CmdArgs, TestDefinition
26-
from cloudai.util.lazy_imports import lazy
27-
28-
if TYPE_CHECKING:
29-
import pandas as pd
21+
from cloudai.workloads.common.nixl import extract_nixlbench_data
3022

3123

3224
class NIXLBenchCmdArgs(CmdArgs):
@@ -42,7 +34,7 @@ class NIXLBenchTestDefinition(TestDefinition):
4234
"""Test definition for a NIXL Bench test."""
4335

4436
cmd_args: NIXLBenchCmdArgs
45-
_nixl_image: Optional[DockerImage] = None
37+
_nixl_image: DockerImage | None = None
4638

4739
@property
4840
def docker_image(self) -> DockerImage:
@@ -59,37 +51,8 @@ def cmd_args_dict(self) -> dict[str, str | list[str]]:
5951
return self.cmd_args.model_dump(exclude={"docker_image_url", "path_to_benchmark", "cmd_args", "etcd_path"})
6052

6153
def was_run_successful(self, tr: TestRun) -> JobStatusResult:
62-
df = extract_nixl_data(tr.output_path / "stdout.txt")
54+
df = extract_nixlbench_data(tr.output_path / "stdout.txt")
6355
if df.empty:
6456
return JobStatusResult(is_successful=False, error_message=f"NIXLBench data not found in {tr.output_path}.")
6557

6658
return JobStatusResult(is_successful=True)
67-
68-
69-
@cache
70-
def extract_nixl_data(stdout_file: Path) -> pd.DataFrame:
71-
if not stdout_file.exists():
72-
logging.debug(f"{stdout_file} not found")
73-
return lazy.pd.DataFrame()
74-
75-
header_present, data = False, []
76-
for line in stdout_file.read_text().splitlines():
77-
if not header_present and (
78-
"Block Size (B) Batch Size " in line and "Avg Lat. (us)" in line and "B/W (GB/Sec)" in line
79-
):
80-
header_present = True
81-
continue
82-
parts = line.split()
83-
if header_present and (len(parts) == 6 or len(parts) == 10):
84-
if len(parts) == 6:
85-
data.append([parts[0], parts[1], parts[2], parts[-1]])
86-
else:
87-
data.append([parts[0], parts[1], parts[3], parts[2]])
88-
89-
df = lazy.pd.DataFrame(data, columns=["block_size", "batch_size", "avg_lat", "bw_gb_sec"])
90-
df["block_size"] = df["block_size"].astype(int)
91-
df["batch_size"] = df["batch_size"].astype(int)
92-
df["avg_lat"] = df["avg_lat"].astype(float)
93-
df["bw_gb_sec"] = df["bw_gb_sec"].astype(float)
94-
95-
return df

src/cloudai/workloads/nixl_bench/report_generation_strategy.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from cloudai.core import METRIC_ERROR, ReportGenerationStrategy
2424
from cloudai.report_generator.tool.bokeh_report_tool import BokehReportTool
2525
from cloudai.util.lazy_imports import lazy
26-
from cloudai.workloads.nixl_bench.nixl_bench import extract_nixl_data
26+
from cloudai.workloads.common.nixl import extract_nixlbench_data
2727

2828

2929
class NIXLBenchReportGenerationStrategy(ReportGenerationStrategy):
@@ -36,27 +36,27 @@ def results_file(self) -> Path:
3636
return self.test_run.output_path / "stdout.txt"
3737

3838
def can_handle_directory(self) -> bool:
39-
df = extract_nixl_data(self.results_file)
39+
df = extract_nixlbench_data(self.results_file)
4040
return not df.empty
4141

4242
def generate_report(self) -> None:
4343
if not self.can_handle_directory():
4444
return
4545

4646
self.generate_bokeh_report()
47-
df = extract_nixl_data(self.results_file)
47+
df = extract_nixlbench_data(self.results_file)
4848
df.to_csv(self.test_run.output_path / "nixlbench.csv", index=False)
4949

5050
def get_metric(self, metric: str) -> float:
5151
logging.debug(f"Getting metric {metric} from {self.results_file.absolute()}")
52-
df = extract_nixl_data(self.results_file)
52+
df = extract_nixlbench_data(self.results_file)
5353
if df.empty or metric not in {"default", "latency"}:
5454
return METRIC_ERROR
5555

5656
return float(lazy.np.mean(df["avg_lat"]))
5757

5858
def generate_bokeh_report(self) -> None:
59-
df = extract_nixl_data(self.results_file)
59+
df = extract_nixlbench_data(self.results_file)
6060

6161
report_tool = BokehReportTool(self.test_run.output_path)
6262
p = report_tool.add_log_x_linear_y_multi_line_plot(

src/cloudai/workloads/nixl_kvbench/nixl_kvbench.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@
1919
from typing import Literal
2020

2121
from cloudai.core import CmdArgs, DockerImage, Installable, JobStatusResult, TestDefinition, TestRun
22-
23-
from ..nixl_bench.nixl_bench import extract_nixl_data
22+
from cloudai.workloads.common.nixl import extract_nixlbench_data
2423

2524

2625
class NIXLKVBenchCmdArgs(CmdArgs):
@@ -67,7 +66,7 @@ def cmd_args_dict(self) -> dict[str, str | list[str]]:
6766
)
6867

6968
def was_run_successful(self, tr: TestRun) -> JobStatusResult:
70-
df = extract_nixl_data(tr.output_path / "stdout.txt")
69+
df = extract_nixlbench_data(tr.output_path / "stdout.txt")
7170
if df.empty:
7271
return JobStatusResult(is_successful=False, error_message=f"NIXLBench data not found in {tr.output_path}.")
7372

tests/ref_data/nixl-perftest.sbatch

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ srun --export=ALL --mpi=pmix --container-image=url.com/docker:tag --container-mo
1616

1717
srun --export=ALL --mpi=pmix --container-image=url.com/docker:tag --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output --ntasks=1 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh
1818

19-
echo SLURM_JOB_MASTER_NODE=$SLURM_JOB_MASTER_NODE
2019
srun --export=ALL --mpi=pmix --container-image=url.com/docker:tag --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output --ntasks-per-node=1 --ntasks=1 -N1 bash -c "/workspace/nixl/.venv/bin/python /workspace/nixl/benchmark/kvbench/test/inference_workload_matgen.py generate --num-user-requests=2 --batch-size=1 --num-prefill-nodes=1 --num-decode-nodes=1 --results-dir=__OUTPUT_DIR__/output/matrices --prefill-tp=1 --prefill-pp=1 --prefill-cp=1 --decode-tp=1 --decode-pp=1 --decode-cp=1 --model=model-name"
2120
srun --export=ALL --mpi=pmix --container-image=url.com/docker:tag --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__OUTPUT_DIR__/install:/cloudai_install,__OUTPUT_DIR__/output --output=__OUTPUT_DIR__/output/etcd.log --overlap --ntasks-per-node=1 --ntasks=1 --nodelist=$SLURM_JOB_MASTER_NODE -N1 etcd --listen-client-urls=http://0.0.0.0:2379 --advertise-client-urls=http://$SLURM_JOB_MASTER_NODE:2379 --listen-peer-urls=http://0.0.0.0:2380 --initial-advertise-peer-urls=http://$SLURM_JOB_MASTER_NODE:2380 --initial-cluster="default=http://$SLURM_JOB_MASTER_NODE:2380" --initial-cluster-state=new &
2221
etcd_pid=$!

tests/report_generation_strategy/test_nixl_bench_report.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
from cloudai.core import Test, TestRun, TestTemplate
2222
from cloudai.systems.slurm import SlurmSystem
23+
from cloudai.workloads.common.nixl import extract_nixlbench_data
2324
from cloudai.workloads.nixl_bench import NIXLBenchCmdArgs, NIXLBenchTestDefinition
24-
from cloudai.workloads.nixl_bench.nixl_bench import extract_nixl_data
2525

2626
LEGACY_FORMAT = """
2727
Block Size (B) Batch Size Avg Lat. (us) B/W (MiB/Sec) B/W (GiB/Sec) B/W (GB/Sec)
@@ -67,7 +67,7 @@ def nixl_tr(tmp_path: Path, slurm_system: SlurmSystem) -> TestRun:
6767
)
6868
def test_nixl_bench_report_parsing(tmp_path: Path, sample: str, exp_latency: list[float], exp_bw: list[float]):
6969
(tmp_path / "nixl_bench.log").write_text(sample)
70-
df = extract_nixl_data(tmp_path / "nixl_bench.log")
70+
df = extract_nixlbench_data(tmp_path / "nixl_bench.log")
7171
assert df.shape == (4, 4)
7272
assert df["block_size"].tolist() == [4096, 8192, 33554432, 67108864]
7373
assert df["batch_size"].tolist() == [1, 1, 1, 1]

0 commit comments

Comments
 (0)