1616
1717from __future__ import annotations
1818
19+ import csv
1920import logging
2021import shutil
22+ from pathlib import Path
2123from typing import ClassVar
2224
2325from cloudai .core import METRIC_ERROR , ReportGenerationStrategy
2426from cloudai .systems .slurm .slurm_system import SlurmSystem
25- from cloudai .util .lazy_imports import lazy
2627
2728
2829class AIDynamoReportGenerationStrategy (ReportGenerationStrategy ):
@@ -44,24 +45,50 @@ def can_handle_directory(self) -> bool:
4445 json_files = list (output_path .rglob ("profile_genai_perf.json" ))
4546 return len (csv_files ) > 0 and len (json_files ) > 0
4647
47- def _read_metric_from_csv (self , metric_name : str ) -> float :
48+ def _find_csv_file (self ) -> Path | None :
4849 output_path = self .test_run .output_path
49- source_csv = next (output_path .rglob ("profile_genai_perf.csv" ))
50-
51- if source_csv .stat ().st_size == 0 :
52- return METRIC_ERROR
50+ if not output_path .exists () or not output_path .is_dir ():
51+ return None
5352
54- df = lazy .pd .read_csv (source_csv )
55- metric_row = df [df ["Metric" ] == metric_name ]
53+ csv_files = list (output_path .rglob ("profile_genai_perf.csv" ))
54+ if not csv_files or csv_files [0 ].stat ().st_size == 0 :
55+ return None
56+
57+ return csv_files [0 ]
58+
59+ def _extract_metric_value (self , header : list [str ], row : list [str ], metric_idx : int ) -> float | None :
60+ if "Value" in header :
61+ value_idx = header .index ("Value" )
62+ return float (row [value_idx ].replace ("," , "" ))
63+ elif "avg" in header :
64+ avg_idx = header .index ("avg" )
65+ return float (row [avg_idx ].replace ("," , "" ))
66+ return None
67+
68+ def _find_metric_in_section (self , section : list [list [str ]], metric_name : str ) -> float | None :
69+ if not section :
70+ return None
71+
72+ header = section [0 ]
73+ if "Metric" not in header :
74+ return None
75+
76+ metric_idx = header .index ("Metric" )
77+ for row in section [1 :]:
78+ if row [metric_idx ] == metric_name :
79+ return self ._extract_metric_value (header , row , metric_idx )
80+ return None
5681
57- if metric_row .empty :
82+ def _read_metric_from_csv (self , metric_name : str ) -> float :
83+ source_csv = self ._find_csv_file ()
84+ if not source_csv :
5885 return METRIC_ERROR
5986
60- if "Value" in df . columns and not metric_row [ "Value" ]. empty :
61- return float ( metric_row [ "Value" ]. iloc [ 0 ])
62-
63- if "avg" in df . columns and not metric_row [ "avg" ]. empty :
64- return float ( metric_row [ "avg" ]. iloc [ 0 ]. replace ( "," , "" ))
87+ sections = self . _read_csv_sections ( source_csv )
88+ for section in sections :
89+ value = self . _find_metric_in_section ( section , metric_name )
90+ if value is not None :
91+ return value
6592
6693 return METRIC_ERROR
6794
@@ -85,36 +112,76 @@ def get_metric(self, metric: str) -> float:
85112
86113 return self ._read_metric_from_csv (mapped_metric )
87114
88- def generate_report (self ) -> None :
89- output_path = self .test_run .output_path
90- source_csv = next (output_path .rglob ("profile_genai_perf.csv" ))
91- target_csv = output_path / "report.csv"
92-
93- shutil .copy2 (source_csv , target_csv )
94-
115+ def _calculate_total_gpus (self ) -> int | None :
95116 gpus_per_node = None
96117 if isinstance (self .system , SlurmSystem ):
97118 gpus_per_node = self .system .gpus_per_node
98119
99120 if gpus_per_node is None :
100- logging .warning ("gpus_per_node is None, skipping Overall Output Tokens per Second per GPU calculation." )
101- return
121+ return None
102122
103123 num_frontend_nodes = 1
104124 num_prefill_nodes = self .test_run .test .test_definition .cmd_args .dynamo .prefill_worker .num_nodes
105125 num_decode_nodes = self .test_run .test .test_definition .cmd_args .dynamo .decode_worker .num_nodes
106126
107- total_gpus = (num_frontend_nodes + num_prefill_nodes + num_decode_nodes ) * gpus_per_node
127+ return (num_frontend_nodes + num_prefill_nodes + num_decode_nodes ) * gpus_per_node
128+
129+ def _read_csv_sections (self , source_csv : Path ) -> list [list [list [str ]]]:
130+ sections = []
131+ current_section = []
108132
109133 with open (source_csv , "r" ) as f :
110- lines = f .readlines ()
111- output_token_throughput_line = next (
112- (line for line in lines if "Output Token Throughput (tokens/sec)" in line ), None
113- )
114- if output_token_throughput_line :
115- output_token_throughput = float (output_token_throughput_line .split ("," )[1 ].strip ())
134+ csv_reader = csv .reader (f )
135+ for row in csv_reader :
136+ if not any (row ): # Empty row indicates section break
137+ if current_section :
138+ sections .append (current_section )
139+ current_section = []
140+ else :
141+ current_section .append (row )
142+ if current_section :
143+ sections .append (current_section )
144+
145+ return sections
146+
147+ def _write_sections_with_metric (
148+ self , target_csv : Path , sections : list [list [list [str ]]], total_gpus : int | None
149+ ) -> None :
150+ with open (target_csv , "w" , newline = "" ) as f :
151+ writer = csv .writer (f )
152+
153+ # Write first section (statistical metrics)
154+ if sections :
155+ for row in sections [0 ]:
156+ writer .writerow (row )
157+ writer .writerow ([]) # Empty row for section break
158+
159+ # Write second section with additional metric if total_gpus is available
160+ if len (sections ) > 1 :
161+ for row in sections [1 ]:
162+ writer .writerow (row )
163+ if total_gpus and row and row [0 ] == "Output Token Throughput (tokens/sec)" :
164+ throughput = float (row [1 ].replace ("," , "" ))
165+ per_gpu_throughput = throughput / total_gpus
166+ writer .writerow (["Overall Output Tokens per Second per GPU" , per_gpu_throughput ])
167+ writer .writerow ([]) # Empty row for section break
168+
169+ # Write remaining sections
170+ for section in sections [2 :]:
171+ for row in section :
172+ writer .writerow (row )
173+ writer .writerow ([]) # Empty row for section break
116174
117- overall_output_tokens_per_second_per_gpu = output_token_throughput / total_gpus
175+ def generate_report (self ) -> None :
176+ output_path = self .test_run .output_path
177+ source_csv = next (output_path .rglob ("profile_genai_perf.csv" ))
178+ target_csv = output_path / "report.csv"
179+
180+ total_gpus = self ._calculate_total_gpus ()
181+ if total_gpus is None :
182+ logging .warning ("gpus_per_node is None, skipping Overall Output Tokens per Second per GPU calculation." )
183+ shutil .copy2 (source_csv , target_csv )
184+ return
118185
119- with open ( target_csv , "a" ) as f :
120- f . write ( f"Overall Output Tokens per Second per GPU, { overall_output_tokens_per_second_per_gpu } \n " )
186+ sections = self . _read_csv_sections ( source_csv )
187+ self . _write_sections_with_metric ( target_csv , sections , total_gpus )
0 commit comments