|
| 1 | +import sys |
| 2 | +import pandas as pd |
| 3 | +import argparse |
| 4 | +from pathlib import Path |
| 5 | +import darshan |
| 6 | +import darshan.cli |
| 7 | +from darshan.backend.cffi_backend import accumulate_records |
| 8 | +from typing import Any, Union, Callable |
| 9 | +from humanize import naturalsize |
| 10 | +import concurrent.futures |
| 11 | +from functools import partial |
| 12 | + |
| 13 | +from rich.console import Console |
| 14 | +from rich.table import Table |
| 15 | + |
| 16 | +def process_logfile(log_path, mod, filter_patterns, filter_mode): |
| 17 | + """ |
| 18 | + Save relevant file statisitcs from a single Darshan log file to a DataFrame. |
| 19 | +
|
| 20 | + Parameters |
| 21 | + ---------- |
| 22 | + log_path : a string, the path to a darshan log file. |
| 23 | + mod : a string, the module name |
| 24 | + filter_patterns: regex patterns for names to exclude/include |
| 25 | + filter_mode: whether to "exclude" or "include" the filter patterns |
| 26 | +
|
| 27 | + Returns |
| 28 | + ------- |
| 29 | + a single DataFrame. |
| 30 | +
|
| 31 | + """ |
| 32 | + try: |
| 33 | + extra_options = {} |
| 34 | + if filter_patterns: |
| 35 | + extra_options["filter_patterns"] = filter_patterns |
| 36 | + extra_options["filter_mode"] = filter_mode |
| 37 | + report = darshan.DarshanReport(log_path, read_all=False) |
| 38 | + if mod not in report.modules: |
| 39 | + return pd.DataFrame() |
| 40 | + report.mod_read_all_records(mod, **extra_options) |
| 41 | + if len(report.records[mod]) == 0: |
| 42 | + return pd.DataFrame() |
| 43 | + recs = report.records[mod].to_df() |
| 44 | + if mod != 'MPI-IO': |
| 45 | + rec_cols = ['id', f'{mod}_BYTES_READ', f'{mod}_BYTES_WRITTEN', f'{mod}_READS', f'{mod}_WRITES'] |
| 46 | + else: |
| 47 | + rec_cols = ['id', 'MPIIO_BYTES_READ', 'MPIIO_BYTES_WRITTEN', 'MPIIO_INDEP_READS', 'MPIIO_COLL_READS', 'MPIIO_INDEP_WRITES', 'MPIIO_COLL_WRITES'] |
| 48 | + df = recs['counters'][rec_cols].copy() |
| 49 | + if mod == 'MPI-IO': |
| 50 | + df['MPIIO_READS'] = df['MPIIO_INDEP_READS'] + df['MPIIO_COLL_READS'] |
| 51 | + df['MPIIO_WRITES'] = df['MPIIO_INDEP_WRITES'] + df['MPIIO_COLL_WRITES'] |
| 52 | + df.drop(columns=['MPIIO_INDEP_READS', 'MPIIO_COLL_READS', 'MPIIO_INDEP_WRITES', 'MPIIO_COLL_WRITES'], inplace=True) |
| 53 | + # try to make column names more uniform |
| 54 | + new_cols = [] |
| 55 | + for col in df.columns: |
| 56 | + ndx = col.find('_') |
| 57 | + if ndx > 0: |
| 58 | + new_cols.append(col[ndx+1:].lower()) |
| 59 | + else: |
| 60 | + new_cols.append(col) |
| 61 | + df.columns = new_cols |
| 62 | + df.insert(0, 'file', df['id'].map(report.name_records)) |
| 63 | + df.insert(1, 'log_file', log_path) |
| 64 | + return df.drop('id', axis=1) # id not needed anymore |
| 65 | + except Exception as e: |
| 66 | + print(f"Error processing {log_path}: {e}", file=sys.stderr) |
| 67 | + return pd.DataFrame() |
| 68 | + |
| 69 | +def combine_dfs(list_dfs): |
| 70 | + """ |
| 71 | + Combine per-job DataFrames of each Darshan log to one DataFrame. |
| 72 | +
|
| 73 | + Parameters |
| 74 | + ---------- |
| 75 | + list_dfs : a list of DataFrames. |
| 76 | +
|
| 77 | + Returns |
| 78 | + ------- |
| 79 | + a single DataFrame with data from multiple Darshan logs. |
| 80 | +
|
| 81 | + """ |
| 82 | + combined_dfs = pd.concat(list_dfs, ignore_index=True) |
| 83 | + return combined_dfs |
| 84 | + |
| 85 | +def group_by_file(combined_dfs): |
| 86 | + """ |
| 87 | + Group data using the 'file' column. Additionally, calculate the |
| 88 | + total number of unique jobs accessing each file. |
| 89 | +
|
| 90 | + Parameters |
| 91 | + ---------- |
| 92 | + combined_dfs : a DataFrame with data from multiple Darshan logs. |
| 93 | +
|
| 94 | + Returns |
| 95 | + ------- |
| 96 | + a DataFrame with the sum of each group. |
| 97 | +
|
| 98 | + """ |
| 99 | + sum_cols = combined_dfs.select_dtypes('number').columns |
| 100 | + # group data by file name, counting number of unique jobs (i.e., log files) |
| 101 | + # that access each file, as well as sum total of numerical columns |
| 102 | + df_groupby_file = combined_dfs.groupby('file', as_index=False).agg( |
| 103 | + **{col: (col, 'sum') for col in sum_cols}, |
| 104 | + total_jobs=('log_file', 'nunique') |
| 105 | + ) |
| 106 | + return df_groupby_file |
| 107 | + |
| 108 | +def sort_dfs_desc(combined_dfs, order_by): |
| 109 | + """ |
| 110 | + Sort data by the column name the user inputs in a descending order. |
| 111 | +
|
| 112 | + Parameters |
| 113 | + ---------- |
| 114 | + combined_dfs : a DataFrame with data from multiple DataFrames. |
| 115 | + order_by : a string, the column name |
| 116 | +
|
| 117 | + Returns |
| 118 | + ------- |
| 119 | + a DataFrame with a descending order of one column. |
| 120 | +
|
| 121 | + """ |
| 122 | + combined_dfs_sort = combined_dfs.sort_values(by=[order_by], ascending=False) |
| 123 | + return combined_dfs_sort |
| 124 | + |
| 125 | +def first_n_recs(df, n): |
| 126 | + """ |
| 127 | + Filter the data to return only the first n records. |
| 128 | +
|
| 129 | + Parameters |
| 130 | + ---------- |
| 131 | + df : a dataframe |
| 132 | + n : an int, number of rows. |
| 133 | +
|
| 134 | + Returns |
| 135 | + ------- |
| 136 | + a DataFrame with n rows. |
| 137 | +
|
| 138 | + """ |
| 139 | + if n >= 0: |
| 140 | + return df.head(n) |
| 141 | + else: |
| 142 | + return df |
| 143 | + |
| 144 | +def rich_print(df, mod, order_by): |
| 145 | + """ |
| 146 | + Pretty print the DataFrame using rich tables. |
| 147 | +
|
| 148 | + Parameters |
| 149 | + ---------- |
| 150 | + df : a dataframe |
| 151 | + mod : a string, the module name |
| 152 | + order_by : a string, the column name of the statistical metric to sort by |
| 153 | +
|
| 154 | + """ |
| 155 | + # calculate totals to plug in to table footer |
| 156 | + all_bytes_read = df['bytes_read'].sum() |
| 157 | + all_bytes_written = df['bytes_written'].sum() |
| 158 | + all_reads = df['reads'].sum() |
| 159 | + all_writes = df['writes'].sum() |
| 160 | + all_total_jobs = df['total_jobs'].sum() |
| 161 | + |
| 162 | + console = Console() |
| 163 | + table = Table(title=f"Darshan {mod} File Stats", show_lines=True, show_footer=True) |
| 164 | + table.add_column("file", f"[u i]TOTAL ({len(df)} files)", justify="center", ratio=5) |
| 165 | + default_kwargs = {"justify": "center", "no_wrap": True, "ratio": 1} |
| 166 | + table.add_column("bytes_read", f"[u i]{naturalsize(all_bytes_read, binary=True, format='%.2f')}", **default_kwargs) |
| 167 | + table.add_column("bytes_written", f"[u i]{naturalsize(all_bytes_written, binary=True, format='%.2f')}", **default_kwargs) |
| 168 | + table.add_column("reads", f"[u i]{all_reads}", **default_kwargs) |
| 169 | + table.add_column("writes", f"[u i]{all_writes}", **default_kwargs) |
| 170 | + table.add_column("total_jobs", f"[u i]{all_total_jobs}", **default_kwargs) |
| 171 | + for column in table.columns: |
| 172 | + if column.header == order_by: |
| 173 | + column.style = column.header_style = column.footer_style = "bold cyan" |
| 174 | + for _, row in df.iterrows(): |
| 175 | + table.add_row(row["file"], |
| 176 | + f"{naturalsize(row['bytes_read'], binary=True, format='%.2f')}", |
| 177 | + f"{naturalsize(row['bytes_written'], binary=True, format='%.2f')}", |
| 178 | + f"{row['reads']}", |
| 179 | + f"{row['writes']}", |
| 180 | + f"{row['total_jobs']}") |
| 181 | + console.print(table) |
| 182 | + |
| 183 | +def setup_parser(parser: argparse.ArgumentParser): |
| 184 | + """ |
| 185 | + Parses the command line arguments. |
| 186 | +
|
| 187 | + Parameters |
| 188 | + ---------- |
| 189 | + parser : command line argument parser. |
| 190 | +
|
| 191 | + """ |
| 192 | + parser.description = "Print statistics describing key metadata and I/O performance metrics for files accessed by a given list of jobs." |
| 193 | + |
| 194 | + parser.add_argument( |
| 195 | + "log_paths", |
| 196 | + nargs='*', |
| 197 | + help="specify the paths to Darshan log files" |
| 198 | + ) |
| 199 | + parser.add_argument( |
| 200 | + "--log_paths_file", |
| 201 | + type=str, |
| 202 | + help="specify the path to a manifest file listing Darshan log files" |
| 203 | + ) |
| 204 | + parser.add_argument( |
| 205 | + "--module", "-m", |
| 206 | + nargs='?', default='POSIX', |
| 207 | + choices=['POSIX', 'MPI-IO', 'STDIO'], |
| 208 | + help="specify the Darshan module to generate file stats for (default: %(default)s)" |
| 209 | + ) |
| 210 | + parser.add_argument( |
| 211 | + "--order_by", "-o", |
| 212 | + nargs='?', default='bytes_read', |
| 213 | + choices=['bytes_read', 'bytes_written', 'reads', 'writes', 'total_jobs'], |
| 214 | + help="specify the I/O metric to order files by (default: %(default)s)" |
| 215 | + ) |
| 216 | + parser.add_argument( |
| 217 | + "--limit", "-l", |
| 218 | + type=int, |
| 219 | + nargs='?', default='-1', |
| 220 | + help="limit output to the top LIMIT number of jobs according to selected metric" |
| 221 | + ) |
| 222 | + parser.add_argument( |
| 223 | + "--csv", "-c", |
| 224 | + action='store_true', |
| 225 | + help="output file stats in CSV format" |
| 226 | + ) |
| 227 | + parser.add_argument( |
| 228 | + "--exclude_names", "-e", |
| 229 | + action='append', |
| 230 | + help="regex patterns for file record names to exclude in stats" |
| 231 | + ) |
| 232 | + parser.add_argument( |
| 233 | + "--include_names", "-i", |
| 234 | + action='append', |
| 235 | + help="regex patterns for file record names to include in stats" |
| 236 | + ) |
| 237 | + |
| 238 | +def get_input_logs(args): |
| 239 | + if args.log_paths_file: |
| 240 | + manifest_path = Path(args.log_paths_file) |
| 241 | + if not manifest_path.is_file(): |
| 242 | + raise ValueError(f"Input manifest file {manifest_path} not found.") |
| 243 | + with open(manifest_path) as f: |
| 244 | + return [line.strip() for line in f if line.strip()] |
| 245 | + elif args.log_paths: |
| 246 | + return args.log_paths |
| 247 | + else: |
| 248 | + raise ValueError("No input Darshan logs provided.") |
| 249 | + |
| 250 | +def main(args: Union[Any, None] = None): |
| 251 | + """ |
| 252 | + Prints file statistics on a set of input Darshan logs. |
| 253 | +
|
| 254 | + Parameters |
| 255 | + ---------- |
| 256 | + args: command line arguments. |
| 257 | +
|
| 258 | + """ |
| 259 | + if args is None: |
| 260 | + parser = argparse.ArgumentParser(description="") |
| 261 | + setup_parser(parser) |
| 262 | + args = parser.parse_args() |
| 263 | + mod = args.module |
| 264 | + order_by = args.order_by |
| 265 | + limit = args.limit |
| 266 | + log_paths = get_input_logs(args) |
| 267 | + filter_patterns=None |
| 268 | + filter_mode=None |
| 269 | + if args.exclude_names and args.include_names: |
| 270 | + raise ValueError('Only one of --exclude_names and --include_names may be used.') |
| 271 | + elif args.exclude_names: |
| 272 | + filter_patterns = args.exclude_names |
| 273 | + filter_mode = "exclude" |
| 274 | + elif args.include_names: |
| 275 | + filter_patterns = args.include_names |
| 276 | + filter_mode = "include" |
| 277 | + process_logfile_with_args = partial(process_logfile, mod=mod, filter_patterns=filter_patterns, filter_mode=filter_mode) |
| 278 | + with concurrent.futures.ProcessPoolExecutor() as executor: |
| 279 | + results = list(executor.map(process_logfile_with_args, log_paths, chunksize=32)) |
| 280 | + list_dfs = [df for df in results if not df.empty] |
| 281 | + if len(list_dfs) == 0: |
| 282 | + sys.exit() |
| 283 | + combined_dfs = combine_dfs(list_dfs) |
| 284 | + combined_dfs_grouped = group_by_file(combined_dfs) |
| 285 | + combined_dfs_sorted = sort_dfs_desc(combined_dfs_grouped, order_by) |
| 286 | + df = first_n_recs(combined_dfs_sorted, limit) |
| 287 | + if args.csv: |
| 288 | + print(df.to_csv(index=False), end="") |
| 289 | + else: |
| 290 | + rich_print(df, mod, order_by) |
| 291 | + |
| 292 | +if __name__ == "__main__": |
| 293 | + main() |
0 commit comments