diff --git a/darshan-util/pydarshan/darshan/cli/file_stats.py b/darshan-util/pydarshan/darshan/cli/file_stats.py new file mode 100644 index 000000000..421035ad5 --- /dev/null +++ b/darshan-util/pydarshan/darshan/cli/file_stats.py @@ -0,0 +1,293 @@ +import sys +import pandas as pd +import argparse +from pathlib import Path +import darshan +import darshan.cli +from darshan.backend.cffi_backend import accumulate_records +from typing import Any, Union, Callable +from humanize import naturalsize +import concurrent.futures +from functools import partial + +from rich.console import Console +from rich.table import Table + +def process_logfile(log_path, mod, filter_patterns, filter_mode): + """ + Save relevant file statisitcs from a single Darshan log file to a DataFrame. + + Parameters + ---------- + log_path : a string, the path to a darshan log file. + mod : a string, the module name + filter_patterns: regex patterns for names to exclude/include + filter_mode: whether to "exclude" or "include" the filter patterns + + Returns + ------- + a single DataFrame. + + """ + try: + extra_options = {} + if filter_patterns: + extra_options["filter_patterns"] = filter_patterns + extra_options["filter_mode"] = filter_mode + report = darshan.DarshanReport(log_path, read_all=False) + if mod not in report.modules: + return pd.DataFrame() + report.mod_read_all_records(mod, **extra_options) + if len(report.records[mod]) == 0: + return pd.DataFrame() + recs = report.records[mod].to_df() + if mod != 'MPI-IO': + rec_cols = ['id', f'{mod}_BYTES_READ', f'{mod}_BYTES_WRITTEN', f'{mod}_READS', f'{mod}_WRITES'] + else: + rec_cols = ['id', 'MPIIO_BYTES_READ', 'MPIIO_BYTES_WRITTEN', 'MPIIO_INDEP_READS', 'MPIIO_COLL_READS', 'MPIIO_INDEP_WRITES', 'MPIIO_COLL_WRITES'] + df = recs['counters'][rec_cols].copy() + if mod == 'MPI-IO': + df['MPIIO_READS'] = df['MPIIO_INDEP_READS'] + df['MPIIO_COLL_READS'] + df['MPIIO_WRITES'] = df['MPIIO_INDEP_WRITES'] + df['MPIIO_COLL_WRITES'] + df.drop(columns=['MPIIO_INDEP_READS', 'MPIIO_COLL_READS', 'MPIIO_INDEP_WRITES', 'MPIIO_COLL_WRITES'], inplace=True) + # try to make column names more uniform + new_cols = [] + for col in df.columns: + ndx = col.find('_') + if ndx > 0: + new_cols.append(col[ndx+1:].lower()) + else: + new_cols.append(col) + df.columns = new_cols + df.insert(0, 'file', df['id'].map(report.name_records)) + df.insert(1, 'log_file', log_path) + return df.drop('id', axis=1) # id not needed anymore + except Exception as e: + print(f"Error processing {log_path}: {e}", file=sys.stderr) + return pd.DataFrame() + +def combine_dfs(list_dfs): + """ + Combine per-job DataFrames of each Darshan log to one DataFrame. + + Parameters + ---------- + list_dfs : a list of DataFrames. + + Returns + ------- + a single DataFrame with data from multiple Darshan logs. + + """ + combined_dfs = pd.concat(list_dfs, ignore_index=True) + return combined_dfs + +def group_by_file(combined_dfs): + """ + Group data using the 'file' column. Additionally, calculate the + total number of unique jobs accessing each file. + + Parameters + ---------- + combined_dfs : a DataFrame with data from multiple Darshan logs. + + Returns + ------- + a DataFrame with the sum of each group. + + """ + sum_cols = combined_dfs.select_dtypes('number').columns + # group data by file name, counting number of unique jobs (i.e., log files) + # that access each file, as well as sum total of numerical columns + df_groupby_file = combined_dfs.groupby('file', as_index=False).agg( + **{col: (col, 'sum') for col in sum_cols}, + total_jobs=('log_file', 'nunique') + ) + return df_groupby_file + +def sort_dfs_desc(combined_dfs, order_by): + """ + Sort data by the column name the user inputs in a descending order. + + Parameters + ---------- + combined_dfs : a DataFrame with data from multiple DataFrames. + order_by : a string, the column name + + Returns + ------- + a DataFrame with a descending order of one column. + + """ + combined_dfs_sort = combined_dfs.sort_values(by=[order_by], ascending=False) + return combined_dfs_sort + +def first_n_recs(df, n): + """ + Filter the data to return only the first n records. + + Parameters + ---------- + df : a dataframe + n : an int, number of rows. + + Returns + ------- + a DataFrame with n rows. + + """ + if n >= 0: + return df.head(n) + else: + return df + +def rich_print(df, mod, order_by): + """ + Pretty print the DataFrame using rich tables. + + Parameters + ---------- + df : a dataframe + mod : a string, the module name + order_by : a string, the column name of the statistical metric to sort by + + """ + # calculate totals to plug in to table footer + all_bytes_read = df['bytes_read'].sum() + all_bytes_written = df['bytes_written'].sum() + all_reads = df['reads'].sum() + all_writes = df['writes'].sum() + all_total_jobs = df['total_jobs'].sum() + + console = Console() + table = Table(title=f"Darshan {mod} File Stats", show_lines=True, show_footer=True) + table.add_column("file", f"[u i]TOTAL ({len(df)} files)", justify="center", ratio=5) + default_kwargs = {"justify": "center", "no_wrap": True, "ratio": 1} + table.add_column("bytes_read", f"[u i]{naturalsize(all_bytes_read, binary=True, format='%.2f')}", **default_kwargs) + table.add_column("bytes_written", f"[u i]{naturalsize(all_bytes_written, binary=True, format='%.2f')}", **default_kwargs) + table.add_column("reads", f"[u i]{all_reads}", **default_kwargs) + table.add_column("writes", f"[u i]{all_writes}", **default_kwargs) + table.add_column("total_jobs", f"[u i]{all_total_jobs}", **default_kwargs) + for column in table.columns: + if column.header == order_by: + column.style = column.header_style = column.footer_style = "bold cyan" + for _, row in df.iterrows(): + table.add_row(row["file"], + f"{naturalsize(row['bytes_read'], binary=True, format='%.2f')}", + f"{naturalsize(row['bytes_written'], binary=True, format='%.2f')}", + f"{row['reads']}", + f"{row['writes']}", + f"{row['total_jobs']}") + console.print(table) + +def setup_parser(parser: argparse.ArgumentParser): + """ + Parses the command line arguments. + + Parameters + ---------- + parser : command line argument parser. + + """ + parser.description = "Print statistics describing key metadata and I/O performance metrics for files accessed by a given list of jobs." + + parser.add_argument( + "log_paths", + nargs='*', + help="specify the paths to Darshan log files" + ) + parser.add_argument( + "--log_paths_file", + type=str, + help="specify the path to a manifest file listing Darshan log files" + ) + parser.add_argument( + "--module", "-m", + nargs='?', default='POSIX', + choices=['POSIX', 'MPI-IO', 'STDIO'], + help="specify the Darshan module to generate file stats for (default: %(default)s)" + ) + parser.add_argument( + "--order_by", "-o", + nargs='?', default='bytes_read', + choices=['bytes_read', 'bytes_written', 'reads', 'writes', 'total_jobs'], + help="specify the I/O metric to order files by (default: %(default)s)" + ) + parser.add_argument( + "--limit", "-l", + type=int, + nargs='?', default='-1', + help="limit output to the top LIMIT number of jobs according to selected metric" + ) + parser.add_argument( + "--csv", "-c", + action='store_true', + help="output file stats in CSV format" + ) + parser.add_argument( + "--exclude_names", "-e", + action='append', + help="regex patterns for file record names to exclude in stats" + ) + parser.add_argument( + "--include_names", "-i", + action='append', + help="regex patterns for file record names to include in stats" + ) + +def get_input_logs(args): + if args.log_paths_file: + manifest_path = Path(args.log_paths_file) + if not manifest_path.is_file(): + raise ValueError(f"Input manifest file {manifest_path} not found.") + with open(manifest_path) as f: + return [line.strip() for line in f if line.strip()] + elif args.log_paths: + return args.log_paths + else: + raise ValueError("No input Darshan logs provided.") + +def main(args: Union[Any, None] = None): + """ + Prints file statistics on a set of input Darshan logs. + + Parameters + ---------- + args: command line arguments. + + """ + if args is None: + parser = argparse.ArgumentParser(description="") + setup_parser(parser) + args = parser.parse_args() + mod = args.module + order_by = args.order_by + limit = args.limit + log_paths = get_input_logs(args) + filter_patterns=None + filter_mode=None + if args.exclude_names and args.include_names: + raise ValueError('Only one of --exclude_names and --include_names may be used.') + elif args.exclude_names: + filter_patterns = args.exclude_names + filter_mode = "exclude" + elif args.include_names: + filter_patterns = args.include_names + filter_mode = "include" + process_logfile_with_args = partial(process_logfile, mod=mod, filter_patterns=filter_patterns, filter_mode=filter_mode) + with concurrent.futures.ProcessPoolExecutor() as executor: + results = list(executor.map(process_logfile_with_args, log_paths, chunksize=32)) + list_dfs = [df for df in results if not df.empty] + if len(list_dfs) == 0: + sys.exit() + combined_dfs = combine_dfs(list_dfs) + combined_dfs_grouped = group_by_file(combined_dfs) + combined_dfs_sorted = sort_dfs_desc(combined_dfs_grouped, order_by) + df = first_n_recs(combined_dfs_sorted, limit) + if args.csv: + print(df.to_csv(index=False), end="") + else: + rich_print(df, mod, order_by) + +if __name__ == "__main__": + main() diff --git a/darshan-util/pydarshan/darshan/cli/job_stats.py b/darshan-util/pydarshan/darshan/cli/job_stats.py new file mode 100644 index 000000000..6aa127377 --- /dev/null +++ b/darshan-util/pydarshan/darshan/cli/job_stats.py @@ -0,0 +1,273 @@ +import sys +import pandas as pd +import argparse +from pathlib import Path +import darshan +import darshan.cli +from darshan.backend.cffi_backend import accumulate_records +from typing import Any, Union, Callable +from datetime import datetime +from humanize import naturalsize +import concurrent.futures +from functools import partial + +from rich.console import Console +from rich.table import Table + +def process_logfile(log_path, mod, filter_patterns, filter_mode): + """ + Save the statistical data from a single Darshan log file to a DataFrame. + + Parameters + ---------- + log_path : a string, the path to a Darshan log file. + mod : a string, the Darshan module name + filter_patterns: regex patterns for names to exclude/include + filter_mode: whether to "exclude" or "include" the filter patterns + + Returns + ------- + a single DataFrame of job statistics. + + """ + try: + extra_options = {} + if filter_patterns: + extra_options["filter_patterns"] = filter_patterns + extra_options["filter_mode"] = filter_mode + report = darshan.DarshanReport(log_path, read_all=False) + if mod not in report.modules: + return pd.DataFrame() + report.mod_read_all_records(mod, **extra_options) + if len(report.records[mod]) == 0: + return pd.DataFrame() + recs = report.records[mod].to_df() + acc_rec = accumulate_records(recs, mod, report.metadata['job']['nprocs']) + dict_acc_rec = {} + dict_acc_rec['log_file'] = log_path.split('/')[-1] + dict_acc_rec['exe'] = report.metadata['exe'] + dict_acc_rec['job_id'] = report.metadata['job']['jobid'] + dict_acc_rec['uid'] = report.metadata['job']['uid'] + dict_acc_rec['nprocs'] = report.metadata['job']['nprocs'] + dict_acc_rec['start_time'] = report.metadata['job']['start_time_sec'] + dict_acc_rec['end_time'] = report.metadata['job']['end_time_sec'] + dict_acc_rec['run_time'] = report.metadata['job']['run_time'] + dict_acc_rec['perf_by_slowest'] = acc_rec.derived_metrics.agg_perf_by_slowest * 1024**2 + dict_acc_rec['time_by_slowest'] = acc_rec.derived_metrics.agg_time_by_slowest + dict_acc_rec['total_bytes'] = acc_rec.derived_metrics.total_bytes + dict_acc_rec['total_files'] = acc_rec.derived_metrics.category_counters[0].count + dict_acc_rec['partial_flag'] = report.modules[mod]['partial_flag'] + df = pd.DataFrame.from_dict([dict_acc_rec]) + return df + except Exception as e: + print(f"Error processing {log_path}: {e}", file=sys.stderr) + return pd.DataFrame() + +def combine_dfs(list_dfs): + """ + Combine per-job DataFrames of each Darshan log into one DataFrame. + + Parameters + ---------- + list_dfs : a list of DataFrames. + + Returns + ------- + a single DataFrame with data from multiple Darshan logs. + + """ + combined_dfs = pd.concat(list_dfs, ignore_index=True) + return combined_dfs + +def sort_dfs_desc(combined_dfs, order_by): + """ + Sort data by the column name the user inputs in a descending order. + + Parameters + ---------- + combined_dfs : a DataFrame with data from multiple Darshan logs. + order_by : a string, the column name of the statistical metric to sort by. + + Returns + ------- + a DataFrame sorted in descending order by a given column. + + """ + combined_dfs_sorted = combined_dfs.sort_values(by=[order_by], ascending=False) + return combined_dfs_sorted + +def first_n_recs(df, n): + """ + Filter the data to return only the first n records. + + Parameters + ---------- + df : a dataframe + n : an int, number of rows. + + Returns + ------- + a DataFrame with n rows. + + """ + if n >= 0: + return df.head(n) + else: + return df + +def rich_print(df, mod, order_by): + """ + Pretty print the DataFrame using rich tables. + + Parameters + ---------- + df : a dataframe + mod : a string, the Darshan module name + order_by : a string, the column name of the statistical metric to sort by + + """ + # calculate totals to plug in to table footer + all_time_by_slowest = df['time_by_slowest'].sum() + all_total_bytes = df['total_bytes'].sum() + all_total_files = df['total_files'].sum() + all_perf_by_slowest = all_total_bytes / all_time_by_slowest + + # instantiate a rich table and pretty print the dataframe + console = Console() + table = Table(title=f"Darshan {mod} Job Stats", show_lines=True, show_footer=True) + table.add_column("job", f"[u i]TOTAL ({len(df)} jobs)", justify="center", ratio=4) + default_kwargs = {"justify": "center", "no_wrap": True, "ratio": 1} + table.add_column("perf_by_slowest", f"[u i]{naturalsize(all_perf_by_slowest, binary=True, format='%.2f')}/s", **default_kwargs) + table.add_column("time_by_slowest", f"[u i]{all_time_by_slowest:.2f} s", **default_kwargs) + table.add_column("total_bytes", f"[u i]{naturalsize(all_total_bytes, binary=True, format='%.2f')}", **default_kwargs) + table.add_column("total_files", f"[u i]{all_total_files}", **default_kwargs) + for column in table.columns: + if column.header == order_by: + column.style = column.header_style = column.footer_style = "bold cyan" + for _, row in df.iterrows(): + job_str = f"[bold]job id[/bold]: {row['job_id']}\n" + job_str = f"[bold]uid[/bold]: {row['uid']}\n" + job_str += f"[bold]nprocs[/bold]: {row['nprocs']}\n" + job_str += f"[bold]start time[/bold]: {datetime.fromtimestamp(row['start_time']).strftime('%m/%d/%Y %H:%M:%S')}\n" + job_str += f"[bold]end time[/bold]: {datetime.fromtimestamp(row['end_time']).strftime('%m/%d/%Y %H:%M:%S')}\n" + job_str += f"[bold]runtime[/bold]: {row['run_time']:.2f} s\n" + job_str += f"[bold]exe[/bold]: {row['exe']}\n" + job_str += f"[bold]log file[/bold]: {row['log_file']}" + table.add_row(job_str, + f"{naturalsize(row['perf_by_slowest'], binary=True, format='%.2f')}/s", + f"{row['time_by_slowest']:.2f} s", + f"{naturalsize(row['total_bytes'], binary=True, format='%.2f')}", + f"{row['total_files']}") + console.print(table) + +def setup_parser(parser: argparse.ArgumentParser): + """ + Parses the command line arguments. + + Parameters + ---------- + parser : command line argument parser. + + """ + parser.description = "Print statistics describing key metadata and I/O performance metrics for a given list of jobs." + + parser.add_argument( + "log_paths", + nargs='*', + help="specify the paths to Darshan log files" + ) + parser.add_argument( + "--log_paths_file", + type=str, + help="specify the path to a manifest file listing Darshan log files" + ) + parser.add_argument( + "--module", "-m", + nargs='?', default='POSIX', + choices=['POSIX', 'MPI-IO', 'STDIO'], + help="specify the Darshan module to generate job stats for (default: %(default)s)" + ) + parser.add_argument( + "--order_by", "-o", + nargs='?', default='total_bytes', + choices=['perf_by_slowest', 'time_by_slowest', 'total_bytes', 'total_files'], + help="specify the I/O metric to order jobs by (default: %(default)s)" + ) + parser.add_argument( + "--limit", "-l", + type=int, + nargs='?', default='-1', + help="limit output to the top LIMIT number of jobs according to selected metric" + ) + parser.add_argument( + "--csv", "-c", + action='store_true', + help="output job stats in CSV format" + ) + parser.add_argument( + "--exclude_names", "-e", + action='append', + help="regex patterns for file record names to exclude in stats" + ) + parser.add_argument( + "--include_names", "-i", + action='append', + help="regex patterns for file record names to include in stats" + ) + +def get_input_logs(args): + if args.log_paths_file: + manifest_path = Path(args.log_paths_file) + if not manifest_path.is_file(): + raise ValueError(f"Input manifest file {manifest_path} not found.") + with open(manifest_path) as f: + return [line.strip() for line in f if line.strip()] + elif args.log_paths: + return args.log_paths + else: + raise ValueError("No input Darshan logs provided.") + +def main(args: Union[Any, None] = None): + """ + Prints job statistics on a set of input Darshan logs. + + Parameters + ---------- + args: command line arguments. + + """ + if args is None: + parser = argparse.ArgumentParser(description="") + setup_parser(parser) + args = parser.parse_args() + mod = args.module + order_by = args.order_by + limit = args.limit + log_paths = get_input_logs(args) + filter_patterns=None + filter_mode=None + if args.exclude_names and args.include_names: + raise ValueError('Only one of --exclude_names and --include_names may be used.') + elif args.exclude_names: + filter_patterns = args.exclude_names + filter_mode = "exclude" + elif args.include_names: + filter_patterns = args.include_names + filter_mode = "include" + process_logfile_with_args = partial(process_logfile, mod=mod, filter_patterns=filter_patterns, filter_mode=filter_mode) + with concurrent.futures.ProcessPoolExecutor() as executor: + results = list(executor.map(process_logfile_with_args, log_paths, chunksize=32)) + list_dfs = [df for df in results if not df.empty] + if len(list_dfs) == 0: + sys.exit() + combined_dfs = combine_dfs(list_dfs) + combined_dfs_sorted = sort_dfs_desc(combined_dfs, order_by) + df = first_n_recs(combined_dfs_sorted, limit) + if args.csv: + df = df.drop("exe", axis=1) + print(df.to_csv(index=False), end="") + else: + rich_print(df, mod, order_by) + +if __name__ == "__main__": + main() diff --git a/darshan-util/pydarshan/darshan/cli/summary.py b/darshan-util/pydarshan/darshan/cli/summary.py index d70a61045..a7f8ed2f0 100644 --- a/darshan-util/pydarshan/darshan/cli/summary.py +++ b/darshan-util/pydarshan/darshan/cli/summary.py @@ -7,7 +7,7 @@ from collections import OrderedDict import importlib.resources as importlib_resources -from typing import Any, Union, Callable +from typing import Any, Union, Callable, List, Optional import pandas as pd from mako.template import Template @@ -124,7 +124,7 @@ def generate_fig(self): elif isinstance(fig, plot_common_access_table.DarshanReportTable): # retrieve html table from `DarshanReportTable` self.fig_html = fig.html - else: + elif fig is not None: err_msg = f"Figure of type {type(fig)} not supported." raise NotImplementedError(err_msg) @@ -137,21 +137,24 @@ class ReportData: ---------- log_path: path to a darshan log file. enable_dxt_heatmap: flag indicating whether DXT heatmaps should be enabled + filter_patterns: regex patterns for names to exclude/include + filter_mode: whether to "exclude" or "include" the filter patterns """ - def __init__(self, log_path: str, enable_dxt_heatmap: bool = False): + def __init__(self, log_path: str, enable_dxt_heatmap: bool = False, + filter_patterns: Optional[List[str]] = None, filter_mode: str = "exclude"): # store the log path and use it to generate the report self.log_path = log_path self.enable_dxt_heatmap = enable_dxt_heatmap # store the report self.report = darshan.DarshanReport(log_path, read_all=False) # read only generic module data and heatmap data by default - self.report.read_all_generic_records() + self.report.read_all_generic_records(filter_patterns=filter_patterns, filter_mode=filter_mode) if "HEATMAP" in self.report.data['modules']: self.report.read_all_heatmap_records() # if DXT heatmaps requested, additionally read-in DXT data if self.enable_dxt_heatmap: - self.report.read_all_dxt_records() + self.report.read_all_dxt_records(filter_patterns=filter_patterns, filter_mode=filter_mode) # create the header/footer self.get_header() self.get_footer() @@ -496,7 +499,11 @@ def register_figures(self): elif "PNETCDF_FILE" in self.report.modules: opcounts_mods.append("PNETCDF_FILE") - for mod in self.report.modules: + for mod in self.report.records: + # skip over modules with no records -- this likely means + # records in the log were filtered out via name exclusions + if len(self.report.records[mod]) == 0: + continue if "H5" in mod: sect_title = "Per-Module Statistics: HDF5" @@ -633,6 +640,12 @@ def build_sections(self): """ self.sections = {} for fig in self.figures: + # skip empty figures that can be generated by report sections + # "Data Access by Category" and "Cross-Module Comparisons" + if (fig.fig_html == None and + (fig.section_title == "Data Access by Category" or + fig.section_title == "Cross-Module Comparisons")): + continue # if a section title is not already in sections, add # the section title and a corresponding empty list # to store its figures @@ -669,6 +682,16 @@ def setup_parser(parser: argparse.ArgumentParser): action="store_true", help="Enable DXT-based versions of I/O activity heatmaps." ) + parser.add_argument( + "--exclude_names", + action='append', + help="regex patterns for file record names to exclude in summary report" + ) + parser.add_argument( + "--include_names", + action='append', + help="regex patterns for file record names to include in summary report" + ) def main(args: Union[Any, None] = None): @@ -687,6 +710,17 @@ def main(args: Union[Any, None] = None): log_path = args.log_path enable_dxt_heatmap = args.enable_dxt_heatmap + filter_patterns=None + filter_mode="exclude" + if args.exclude_names and args.include_names: + print('Error: only one of --exclude_names and --include_names may be used.') + sys.exit(1) + elif args.exclude_names: + filter_patterns = args.exclude_names + filter_mode = "exclude" + elif args.include_names: + filter_patterns = args.include_names + filter_mode = "include" if args.output is None: # if no output is provided, use the log file @@ -699,7 +733,9 @@ def main(args: Union[Any, None] = None): # collect the report data to feed into the template report_data = ReportData( log_path=log_path, - enable_dxt_heatmap=enable_dxt_heatmap + enable_dxt_heatmap=enable_dxt_heatmap, + filter_patterns=filter_patterns, + filter_mode=filter_mode ) with importlib_resources.path(darshan.cli, "base.html") as base_path: diff --git a/darshan-util/pydarshan/darshan/experimental/aggregators/agg_ioops.py b/darshan-util/pydarshan/darshan/experimental/aggregators/agg_ioops.py index 90bf765d4..6496f89d6 100644 --- a/darshan-util/pydarshan/darshan/experimental/aggregators/agg_ioops.py +++ b/darshan-util/pydarshan/darshan/experimental/aggregators/agg_ioops.py @@ -28,7 +28,7 @@ def agg_ioops(self, mode='append'): for mod in mods: # check records for module are present - if mod not in recs: + if mod not in recs or len(recs[mod]) == 0: continue agg = None diff --git a/darshan-util/pydarshan/darshan/experimental/plots/data_access_by_filesystem.py b/darshan-util/pydarshan/darshan/experimental/plots/data_access_by_filesystem.py index c60c666b1..0bd61e8ae 100644 --- a/darshan-util/pydarshan/darshan/experimental/plots/data_access_by_filesystem.py +++ b/darshan-util/pydarshan/darshan/experimental/plots/data_access_by_filesystem.py @@ -208,11 +208,11 @@ def rec_to_rw_counter_dfs(report: Any, rec_counters = pd.DataFrame() df_reads = pd.DataFrame() df_writes = pd.DataFrame() - if "POSIX" in report.modules: + if "POSIX" in report.modules and len(report.records["POSIX"]) > 0: rec_counters = pd.concat(objs=(rec_counters, report.records["POSIX"].to_df()['counters'])) df_reads = pd.concat(objs=(df_reads, rec_counters.loc[rec_counters[f'POSIX_BYTES_READ'] >= 1])) df_writes = pd.concat(objs=(df_writes, rec_counters.loc[rec_counters[f'POSIX_BYTES_WRITTEN'] >= 1])) - if "STDIO" in report.modules: + if "STDIO" in report.modules and len(report.records["STDIO"]) > 0: rec_counters = pd.concat(objs=(rec_counters, report.records["STDIO"].to_df()['counters'])) df_reads = pd.concat(objs=(df_reads, rec_counters.loc[rec_counters[f'STDIO_BYTES_READ'] >= 1])) df_writes = pd.concat(objs=(df_writes, rec_counters.loc[rec_counters[f'STDIO_BYTES_WRITTEN'] >= 1])) @@ -632,7 +632,7 @@ def plot_with_report(report: darshan.DarshanReport, Returns ------- - fig: matplotlib figure object + fig: matplotlib figure object or None if no data to plot """ fig = plt.figure() file_id_dict = report.data["name_records"] @@ -648,6 +648,10 @@ def plot_with_report(report: darshan.DarshanReport, for ident in allowed_ids: allowed_file_id_dict[ident] = file_id_dict[ident] + if len(allowed_file_id_dict) == 0: + # no data, likely because all records have been filtered out + return None + filesystem_roots = identify_filesystems(file_id_dict=allowed_file_id_dict, verbose=verbose) # NOTE: this is a bit ugly, STDIO and POSIX are both combined diff --git a/darshan-util/pydarshan/darshan/experimental/plots/plot_io_cost.py b/darshan-util/pydarshan/darshan/experimental/plots/plot_io_cost.py index 27a1ca960..9f8d1961e 100644 --- a/darshan-util/pydarshan/darshan/experimental/plots/plot_io_cost.py +++ b/darshan-util/pydarshan/darshan/experimental/plots/plot_io_cost.py @@ -111,7 +111,7 @@ def get_io_cost_df(report: darshan.DarshanReport) -> Any: io_cost_dict = {} supported_modules = ["POSIX", "MPI-IO", "STDIO", "H5F", "H5D", "PNETCDF_FILE", "PNETCDF_VAR"] for mod_key in report.modules: - if mod_key in supported_modules: + if mod_key in supported_modules and len(report.records[mod_key]) > 0: # collect the records in dataframe form recs = report.records[mod_key].to_df(attach=None) # correct the MPI module key @@ -150,13 +150,18 @@ def plot_io_cost(report: darshan.DarshanReport) -> Any: Returns ------- io_cost_fig: a ``matplotlib.pyplot.figure`` object containing a - stacked bar graph of the average read, write, and metadata times. + stacked bar graph of the average read, write, and metadata times -- + or None when there is no data to plot """ # get the run time from the report metadata runtime = report.metadata["job"]["run_time"] # get the I/O cost dataframe io_cost_df = get_io_cost_df(report=report) + if io_cost_df.empty: + # return an empty figure if there's no data + # this typically occurs when all module records have been filtered out + return None # generate a figure with 2 y axes io_cost_fig = plt.figure(figsize=(4.5, 4)) ax_raw = io_cost_fig.add_subplot(111) diff --git a/darshan-util/pydarshan/darshan/report.py b/darshan-util/pydarshan/darshan/report.py index c0f7c8a9b..0df68efe1 100644 --- a/darshan-util/pydarshan/darshan/report.py +++ b/darshan-util/pydarshan/darshan/report.py @@ -309,14 +309,16 @@ def __init__(self, filename=None, dtype='numpy', start_time=None, end_time=None, automatic_summary=False, - read_all=True, lookup_name_records=True): + read_all=True, + filter_patterns=None, filter_mode="exclude"): """ Args: filename (str): filename to open (optional) dtype (str): default dtype for internal structures automatic_summary (bool): automatically generate summary after loading read_all (bool): whether to read all records for log - lookup_name_records (bool): lookup and update name_records as records are loaded + filter_patterns (list of strings): list of Python regex strings to match against + filter_mode (str): filter mode to use (either "exclude" or "include") Return: None @@ -326,14 +328,13 @@ def __init__(self, self.log = None # Behavioral Options - self.dtype = dtype # default dtype to return when viewing records + self.dtype = dtype # default dtype to return when viewing records + self.name_records_read = False # True if name records have been read from the log self.automatic_summary = automatic_summary - self.lookup_name_records = lookup_name_records # State dependent book-keeping self.converted_records = False # true if convert_records() was called (unnumpyfy) - # Report Metadata # # Start/End + Timebase are @@ -354,7 +355,6 @@ def __init__(self, self.summary_revision = 0 # counter to check if summary needs update (see data_revision) self.summary = {} - # legacy references (deprecate before 1.0?) self.data_revision = 0 # counter for consistency checks self.data = {'version': 1} @@ -365,16 +365,14 @@ def __init__(self, self.data['counters'] = self.counters self.data['name_records'] = self.name_records - # when using report algebra this log allows to untangle potentially # unfair aggregations (e.g., double accounting) self.provenance_enabled = True self.provenance_graph = [] self.provenance_reports = {} - if filename: - self.open(filename, read_all=read_all) + self.open(filename, read_all=read_all, filter_patterns=filter_patterns, filter_mode=filter_mode) @property @@ -408,13 +406,15 @@ def heatmaps(self): # - def open(self, filename, read_all=False): + def open(self, filename, read_all=False, filter_patterns=None, filter_mode="exclude"): """ Open log file via CFFI backend. Args: filename (str): filename to open (optional) read_all (bool): whether to read all records for log + filter_patterns (list of strings): list of Python regex strings to match against + filter_mode (str): filter mode to use (either "exclude" or "include") Return: None @@ -428,10 +428,10 @@ def open(self, filename, read_all=False): if not bool(self.log['handle']): raise RuntimeError("Failed to open file.") - self.read_metadata(read_all=read_all) + self.read_metadata() if read_all: - self.read_all() + self.read_all(filter_patterns=filter_patterns, filter_mode=filter_mode) def __add__(self, other): @@ -464,8 +464,7 @@ def __deepcopy__(self, memo): # TODO: might consider treating self.log as list of open logs to not deactivate load functions? - - def read_metadata(self, read_all=False): + def read_metadata(self): """ Read metadata such as the job, the executables and available modules. @@ -488,60 +487,55 @@ def read_metadata(self, read_all=False): self.data['modules'] = backend.log_get_modules(self.log) self._modules = self.data['modules'] - if read_all == True: - self.data["name_records"] = backend.log_get_name_records(self.log) - self.name_records = self.data['name_records'] - - def update_name_records(self, mod=None): + def read_name_records(self, filter_patterns=None, filter_mode="exclude"): """ - Update (and prune unused) name records from resolve table. - - First reindexes all used name record identifiers and then queries - darshan-utils library to compile filtered list of name records. + Read all name records (record ID -> record name map) from the + Darshan log file using darshan-utils library. If filter patterns + are provided, either filter those records out (exclude) or in (include). Args: - None + filter_patterns (list of strs): regex patterns for names to exclude/include + filter_mode (str): whether to "exclude" or "include" the filter patterns Return: None """ - # sanitize inputs - mods = mod - if mods is None: - mods = self.records - else: - mods = [mod] - - - # state - ids = set() - - for mod in mods: - logger.debug(f" Refreshing name_records for mod={mod}") - for rec in self.records[mod]: - ids.add(rec['id']) - - - self.name_records.update(backend.log_lookup_name_records(self.log, ids)) - - - def read_all(self, dtype=None): + if filter_patterns and filter_mode not in {"exclude", "include"}: + raise RuntimeError("Invalid filter mode used for read_name_records().") + tmp_name_records = backend.log_get_name_records(self.log) + # filter name records according to user-supplied patterns + if filter_patterns: + compiled_patterns = [re.compile(p) for p in filter_patterns] + tmp_name_records = { + rec_id : rec_name + for (rec_id, rec_name) in tmp_name_records.items() + if ((filter_mode == "exclude" and not any(p.search(rec_name) for p in compiled_patterns)) + or + (filter_mode == "include" and any(p.search(rec_name) for p in compiled_patterns))) + } + self.data["name_records"] = tmp_name_records + self.name_records = self.data['name_records'] + self.name_records_read = True + + + def read_all(self, dtype=None, filter_patterns=None, filter_mode="exclude"): """ Read all available records from darshan log and return as dictionary. Args: - None + filter_patterns (list of strings): list of Python regex strings to match against + filter_mode (str): filter mode to use (either "exclude" or "include") Return: None """ - self.read_all_generic_records(dtype=dtype) - self.read_all_dxt_records(dtype=dtype) + self.read_all_generic_records(dtype=dtype, filter_patterns=filter_patterns, filter_mode=filter_mode) + self.read_all_dxt_records(dtype=dtype, filter_patterns=filter_patterns, filter_mode=filter_mode) if "LUSTRE" in self.data['modules']: - self.mod_read_all_lustre_records(dtype=dtype) + self.mod_read_all_lustre_records(dtype=dtype, filter_patterns=filter_patterns, filter_mode=filter_mode) if "APMPI" in self.data['modules']: self.mod_read_all_apmpi_records(dtype=dtype) if "APXC" in self.data['modules']: @@ -552,12 +546,14 @@ def read_all(self, dtype=None): return - def read_all_generic_records(self, counters=True, fcounters=True, dtype=None): + def read_all_generic_records(self, counters=True, fcounters=True, dtype=None, + filter_patterns=None, filter_mode="exclude"): """ Read all generic records from darshan log and return as dictionary. Args: - None + filter_patterns (list of strings): list of Python regex strings to match against + filter_mode (str): filter mode to use (either "exclude" or "include") Return: None @@ -566,16 +562,18 @@ def read_all_generic_records(self, counters=True, fcounters=True, dtype=None): dtype = dtype if dtype else self.dtype for mod in self.data['modules']: - self.mod_read_all_records(mod, dtype=dtype, warnings=False) - + self.mod_read_all_records(mod, dtype=dtype, warnings=False, + filter_patterns=filter_patterns, filter_mode=filter_mode) - def read_all_dxt_records(self, reads=True, writes=True, dtype=None): + def read_all_dxt_records(self, reads=True, writes=True, dtype=None, + filter_patterns=None, filter_mode="exclude"): """ Read all dxt records from darshan log and return as dictionary. Args: - None + filter_patterns (list of strings): list of Python regex strings to match against + filter_mode (str): filter mode to use (either "exclude" or "include") Return: None @@ -584,7 +582,8 @@ def read_all_dxt_records(self, reads=True, writes=True, dtype=None): dtype = dtype if dtype else self.dtype for mod in self.data['modules']: - self.mod_read_all_dxt_records(mod, warnings=False, reads=reads, writes=writes, dtype=dtype) + self.mod_read_all_dxt_records(mod, dtype=dtype, warnings=False, reads=reads, writes=writes, + filter_patterns=filter_patterns, filter_mode=filter_mode) def read_all_heatmap_records(self): @@ -635,13 +634,17 @@ def heatmap_rec_to_module_name(rec, nrecs=None): self._heatmaps = heatmaps - def mod_read_all_records(self, mod, dtype=None, warnings=True): + def mod_read_all_records(self, mod, dtype=None, warnings=True, + filter_patterns=None, filter_mode="exclude", + refresh_names=False): """ Reads all generic records for module Args: mod (str): Identifier of module to fetch all records dtype (str): 'numpy' for ndarray (default), 'dict' for python dictionary, 'pandas' + filter_patterns (list of strings): list of Python regex strings to match against + filter_mode (str): filter mode to use (either "exclude" or "include") Return: None @@ -655,11 +658,9 @@ def mod_read_all_records(self, mod, dtype=None, warnings=True): # skip mod return - # handling options dtype = dtype if dtype else self.dtype - self.records[mod] = DarshanRecordCollection(mod=mod, report=self) cn = backend.counter_names(mod) fcn = backend.fcounter_names(mod) @@ -674,20 +675,22 @@ def mod_read_all_records(self, mod, dtype=None, warnings=True): self.counters[mod]['counters'] = cn self.counters[mod]['fcounters'] = fcn + # get name records if they have not been read yet + if not self.name_records_read or refresh_names: + self.read_name_records(filter_patterns=filter_patterns, filter_mode=filter_mode) # fetch records rec = backend.log_get_generic_record(self.log, mod, dtype=dtype) while rec != None: - self.records[mod].append(rec) - self._modules[mod]['num_records'] += 1 + if rec['id'] in self.name_records: + # only keep records we have names for, otherwise the record + # likely has a name that was excluded + self.records[mod].append(rec) + self._modules[mod]['num_records'] += 1 # fetch next rec = backend.log_get_generic_record(self.log, mod, dtype=dtype) - - if self.lookup_name_records: - self.update_name_records(mod=mod) - # process/combine records if the format dtype allows for this if dtype == 'pandas': combined_c = None @@ -716,7 +719,8 @@ def mod_read_all_records(self, mod, dtype=None, warnings=True): }] - def mod_read_all_apmpi_records(self, mod="APMPI", dtype=None, warnings=True): + def mod_read_all_apmpi_records(self, mod="APMPI", dtype=None, warnings=True, + refresh_names=False): """ Reads all APMPI records for provided module. @@ -751,7 +755,10 @@ def mod_read_all_apmpi_records(self, mod="APMPI", dtype=None, warnings=True): if mod not in self.counters: self.counters[mod] = {} - # fetch records + # get name records if they have not been read yet + if not self.name_records_read or refresh_names: + self.read_name_records() + # fetch header record rec = backend.log_get_apmpi_record(self.log, mod, "HEADER", dtype=dtype) while rec != None: @@ -762,11 +769,8 @@ def mod_read_all_apmpi_records(self, mod="APMPI", dtype=None, warnings=True): rec = backend.log_get_apmpi_record(self.log, mod, "PERF", dtype=dtype) - if self.lookup_name_records: - self.update_name_records(mod=mod) - - - def mod_read_all_apxc_records(self, mod="APXC", dtype=None, warnings=True): + def mod_read_all_apxc_records(self, mod="APXC", dtype=None, warnings=True, + refresh_names=False): """ Reads all APXC records for provided module. @@ -801,7 +805,10 @@ def mod_read_all_apxc_records(self, mod="APXC", dtype=None, warnings=True): if mod not in self.counters: self.counters[mod] = {} - # fetch records + # get name records if they have not been read yet + if not self.name_records_read or refresh_names: + self.read_name_records() + # fetch header record rec = backend.log_get_apxc_record(self.log, mod, "HEADER", dtype=dtype) while rec != None: @@ -811,17 +818,18 @@ def mod_read_all_apxc_records(self, mod="APXC", dtype=None, warnings=True): # fetch next rec = backend.log_get_apxc_record(self.log, mod, "PERF", dtype=dtype) - if self.lookup_name_records: - self.update_name_records(mod=mod) - - def mod_read_all_dxt_records(self, mod, dtype=None, warnings=True, reads=True, writes=True): + def mod_read_all_dxt_records(self, mod, dtype=None, warnings=True, reads=True, writes=True, + filter_patterns=None, filter_mode="exclude", + refresh_names=False): """ Reads all dxt records for provided module. Args: mod (str): Identifier of module to fetch all records dtype (str): 'numpy' for ndarray (default), 'dict' for python dictionary + filter_patterns (list of strings): list of Python regex strings to match against + filter_mode (str): filter mode to use (either "exclude" or "include") Return: None @@ -832,7 +840,6 @@ def mod_read_all_dxt_records(self, mod, dtype=None, warnings=True, reads=True, w logger.warning(f" Skipping. Log does not contain data for mod: {mod}") return - supported = ['DXT_POSIX', 'DXT_MPIIO'] if mod not in supported: @@ -841,11 +848,9 @@ def mod_read_all_dxt_records(self, mod, dtype=None, warnings=True, reads=True, w # skip mod return - # handling options dtype = dtype if dtype else self.dtype - self.records[mod] = DarshanRecordCollection(mod=mod, report=self) # update module metadata @@ -853,54 +858,47 @@ def mod_read_all_dxt_records(self, mod, dtype=None, warnings=True, reads=True, w if mod not in self.counters: self.counters[mod] = {} + # get name records if they have not been read yet + if not self.name_records_read or refresh_names: + self.read_name_records(filter_patterns=filter_patterns, filter_mode=filter_mode) # fetch records - rec = backend.log_get_dxt_record(self.log, mod, dtype=dtype) + rec = backend.log_get_dxt_record(self.log, mod, reads=reads, writes=writes, dtype=dtype) while rec != None: - self.records[mod].append(rec) - self.data['modules'][mod]['num_records'] += 1 + if rec['id'] in self.name_records: + # only keep records we have names for, otherwise the record + # likely has a name that was excluded + self.records[mod].append(rec) + self._modules[mod]['num_records'] += 1 # fetch next rec = backend.log_get_dxt_record(self.log, mod, reads=reads, writes=writes, dtype=dtype) - if self.lookup_name_records: - self.update_name_records(mod=mod) - - - - - def mod_read_all_lustre_records(self, mod="LUSTRE", dtype=None, warnings=True): + def mod_read_all_lustre_records(self, dtype=None, warnings=True, + filter_patterns=None, filter_mode="exclude", + refresh_names=False): """ - Reads all dxt records for provided module. + Reads all lustre records. Args: - mod (str): Identifier of module to fetch all records dtype (str): 'numpy' for ndarray (default), 'dict' for python dictionary + filter_patterns (list of strings): list of Python regex strings to match against + filter_mode (str): filter mode to use (either "exclude" or "include") Return: None """ + mod = "LUSTRE" if mod not in self.modules: if warnings: logger.warning(f" Skipping. Log does not contain data for mod: {mod}") return - - supported = ['LUSTRE'] - - if mod not in supported: - if warnings: - logger.warning(f" Skipping. Unsupported module: {mod} in in mod_read_all_dxt_records(). Supported: {supported}") - # skip mod - return - - # handling options dtype = dtype if dtype else self.dtype - self.records[mod] = DarshanRecordCollection(mod=mod, report=self) cn = backend.counter_names("LUSTRE_COMP") @@ -910,20 +908,22 @@ def mod_read_all_lustre_records(self, mod="LUSTRE", dtype=None, warnings=True): self.counters[mod] = {} self.counters[mod]['counters'] = cn + # get name records if they have not been read yet + if not self.name_records_read or refresh_names: + self.read_name_records(filter_patterns=filter_patterns, filter_mode=filter_mode) # fetch records rec = backend.log_get_record(self.log, mod, dtype=dtype) while rec != None: - self.records[mod].append(rec) - self.data['modules'][mod]['num_records'] += 1 + if rec['id'] in self.name_records: + # only keep records we have names for, otherwise the record + # likely has a name that was excluded + self.records[mod].append(rec) + self._modules[mod]['num_records'] += 1 # fetch next rec = backend.log_get_record(self.log, mod, dtype=dtype) - - if self.lookup_name_records: - self.update_name_records(mod=mod) - # process/combine records if the format dtype allows for this if dtype == 'pandas': combined_c = None @@ -934,7 +934,6 @@ def mod_read_all_lustre_records(self, mod="LUSTRE", dtype=None, warnings=True): else: combined_c = pd.concat([combined_c, rec['components']]) - self.records[mod] = [{ 'rank': -1, 'id': -1, @@ -942,9 +941,6 @@ def mod_read_all_lustre_records(self, mod="LUSTRE", dtype=None, warnings=True): }] - - - def mod_records(self, mod, dtype='numpy', warnings=True): """ diff --git a/darshan-util/pydarshan/darshan/tests/test_data_access_by_filesystem.py b/darshan-util/pydarshan/darshan/tests/test_data_access_by_filesystem.py index a4f0b0523..9fb19fe12 100644 --- a/darshan-util/pydarshan/darshan/tests/test_data_access_by_filesystem.py +++ b/darshan-util/pydarshan/darshan/tests/test_data_access_by_filesystem.py @@ -679,3 +679,32 @@ def test_stdio_basic_inclusion(logname, assert_series_equal(file_wr_series, expected_file_wr_series) assert_series_equal(bytes_rd_series, expected_bytes_rd_series) assert_series_equal(bytes_wr_series, expected_bytes_wr_series) + +def test_plot_with_empty_data(): + # generate a report object that filters out all contained records + # to ensure data access by category plot properly returns None instead of failing + logpath = get_log_path("ior_hdf5_example.darshan") + # use a bogus regex with the "include" filter mode to ensure no records are included + with darshan.DarshanReport(logpath, filter_patterns=["bogus-regex"], filter_mode="include") as report: + fig = data_access_by_filesystem.plot_with_report(report=report) + assert fig == None + +def test_with_filtered_data(): + # ensure get_io_cost_df doesn't include data for modules with no records + logpath = get_log_path("sample-badost.darshan") + # generate a report object with all STDIO module records filtered out + # POSIX records should still remain + with darshan.DarshanReport(logpath, filter_patterns=["ior-posix"], filter_mode="include") as report: + file_id_dict = report.data["name_records"] + actual_df_reads, actual_df_writes = data_access_by_filesystem.rec_to_rw_counter_dfs_with_cols(report=report, + file_id_dict=file_id_dict) + assert len(actual_df_reads) == 0 + assert len(actual_df_writes) == 2048 + # generate a report object with all POSIX module records filtered out + # STDIO records should still remain + with darshan.DarshanReport(logpath, filter_patterns=["ior-posix"], filter_mode="exclude") as report: + file_id_dict = report.data["name_records"] + actual_df_reads, actual_df_writes = data_access_by_filesystem.rec_to_rw_counter_dfs_with_cols(report=report, + file_id_dict=file_id_dict) + assert len(actual_df_reads) == 1 + assert len(actual_df_writes) == 2 diff --git a/darshan-util/pydarshan/darshan/tests/test_file_stats.py b/darshan-util/pydarshan/darshan/tests/test_file_stats.py new file mode 100644 index 000000000..9c87fe657 --- /dev/null +++ b/darshan-util/pydarshan/darshan/tests/test_file_stats.py @@ -0,0 +1,136 @@ +import argparse +from unittest import mock +from darshan.log_utils import get_log_path +from darshan.cli import file_stats +from darshan.log_utils import _provide_logs_repo_filepaths +import pandas as pd +import io +import pytest + +@pytest.mark.parametrize( + "argv", [ + ["--csv", + "--module=POSIX", + "--order_by=bytes_written", + get_log_path("shane_macsio_id29959_5-22-32552-7035573431850780836_1590156158.darshan")], + ] +) +def test_file_stats(argv, capsys): + with mock.patch("sys.argv", argv): + # initialize the parser + parser = argparse.ArgumentParser(description="") + # run through setup_parser() + file_stats.setup_parser(parser=parser) + # parse the input arguments + args = parser.parse_args(argv) + # run once with CSV output and spot check some of the output + file_stats.main(args=args) + captured = capsys.readouterr() + assert not captured.err + assert captured.out + df = pd.read_csv(io.StringIO(captured.out)) + assert len(df) == 3 + # check the first file (most bytes written) + expected_first = { + 'file': '/tmp/test/macsio_hdf5_000.h5', + 'bytes_read': 39816960, + 'bytes_written': 54579416, + 'reads': 6, + 'writes': 7699, + 'total_jobs': 1 + } + row = df.iloc[0] + for key, value in expected_first.items(): + assert row[key] == value + # check the last file (least bytes written) + expected_last = { + 'file': '/tmp/test/macsio-timings.log', + 'bytes_read': 0, + 'bytes_written': 12460, + 'reads': 0, + 'writes': 51, + 'total_jobs': 1 + } + row = df.iloc[-1] + for key, value in expected_last.items(): + assert row[key] == value + assert expected_first['bytes_written'] > expected_last['bytes_written'] + # run again to ensure default Rich print mode runs successfully + args.csv = False + file_stats.main(args=args) + assert not captured.err + +def _provide_logs_repo_filepaths_filtered(): + return [ + path for path in _provide_logs_repo_filepaths() + if 'dlio_logs' in path + ] +@pytest.mark.skipif(not pytest.has_log_repo, + reason="missing darshan_logs") +@pytest.mark.parametrize( + ("argv", "expected"), + [ + ( + ["--csv", + "--module=POSIX", + "--order_by=bytes_read", + *_provide_logs_repo_filepaths_filtered()], + {'len': 194, + 'bytes_read': 129953991223, + 'bytes_written': 523946754, + 'reads': 35762, + 'writes': 168, + 'total_jobs': 670} + ), + ( + ["--csv", + "--module=POSIX", + "--order_by=bytes_read", + "--limit=5", + *_provide_logs_repo_filepaths_filtered()], + {'len': 5, + 'bytes_read': 7214542900, + 'bytes_written': 0, + 'reads': 1830, + 'writes': 0, + 'total_jobs': 5} + ), + ( + ["--csv", + "--module=POSIX", + "--order_by=bytes_read", + "--include_names=\\.npz$", + *_provide_logs_repo_filepaths_filtered()], + {'len': 168, + 'bytes_read': 129953701195, + 'bytes_written': 0, + 'reads': 34770, + 'writes': 0, + 'total_jobs': 172} + ) + ] +) +def test_file_stats_multi(argv, expected, capsys): + with mock.patch("sys.argv", argv): + # initialize the parser + parser = argparse.ArgumentParser(description="") + # run through setup_parser() + file_stats.setup_parser(parser=parser) + # parse the input arguments + args = parser.parse_args(argv) + # run once with CSV output and spot check some of the output + file_stats.main(args=args) + captured = capsys.readouterr() + assert not captured.err + assert captured.out + df = pd.read_csv(io.StringIO(captured.out)) + assert len(df) == expected['len'] + assert df['bytes_read'].sum() == expected['bytes_read'] + assert df['bytes_written'].sum() == expected['bytes_written'] + assert df['reads'].sum() == expected['reads'] + assert df['writes'].sum() == expected['writes'] + assert df['total_jobs'].sum() == expected['total_jobs'] + # run again to ensure default Rich print mode runs successfully + args.csv = False + file_stats.main(args=args) + assert not captured.err diff --git a/darshan-util/pydarshan/darshan/tests/test_job_stats.py b/darshan-util/pydarshan/darshan/tests/test_job_stats.py new file mode 100644 index 000000000..010647558 --- /dev/null +++ b/darshan-util/pydarshan/darshan/tests/test_job_stats.py @@ -0,0 +1,122 @@ +import argparse +from unittest import mock +from darshan.log_utils import get_log_path +from darshan.cli import job_stats +from darshan.log_utils import _provide_logs_repo_filepaths +from numpy.testing import assert_allclose +import pandas as pd +import io +import pytest + +@pytest.mark.parametrize( + "argv", [ + ["--csv", + "--module=STDIO", + "--order_by=total_bytes", + get_log_path("sample-badost.darshan")], + ] +) +def test_job_stats(argv, capsys): + with mock.patch("sys.argv", argv): + # initialize the parser + parser = argparse.ArgumentParser(description="") + # run through setup_parser() + job_stats.setup_parser(parser=parser) + # parse the input arguments + args = parser.parse_args(argv) + # run once with CSV output and spot check some of the output + job_stats.main(args=args) + captured = capsys.readouterr() + assert not captured.err + assert captured.out + df = pd.read_csv(io.StringIO(captured.out)) + assert len(df) == 1 + expected = { + 'log_file': 'sample-badost.darshan', + 'job_id': 6265799, + 'nprocs': 2048, + 'run_time': 780.0, + 'perf_by_slowest': 8.249708e+06, + 'time_by_slowest': 0.200828, + 'total_bytes': 1656773, + 'total_files': 3, + 'partial_flag': False + } + row = df.iloc[0] + for key, value in expected.items(): + if key == 'perf_by_slowest' or key == 'time_by_slowest': + assert_allclose(row[key], value, rtol=1e-5, atol=1e-8) + else: + assert row[key] == value + # run again to ensure default Rich print mode runs successfully + args.csv = False + job_stats.main(args=args) + assert not captured.err + +def _provide_logs_repo_filepaths_filtered(): + return [ + path for path in _provide_logs_repo_filepaths() + if 'dlio_logs' in path + ] +@pytest.mark.skipif(not pytest.has_log_repo, + reason="missing darshan_logs") +@pytest.mark.parametrize( + ("argv", "expected"), + [ + ( + ["--csv", + "--module=POSIX", + "--order_by=perf_by_slowest", + *_provide_logs_repo_filepaths_filtered()], + {'perf_by_slowest': 1818543162.0558, + 'time_by_slowest': 89.185973, + 'total_bytes': 130477937977, + 'total_files': 670} + ), + ( + ["--csv", + "--module=POSIX", + "--order_by=perf_by_slowest", + "--limit=5", + *_provide_logs_repo_filepaths_filtered()], + {'perf_by_slowest': 1818543162.0558, + 'time_by_slowest': 30.823626, + 'total_bytes': 54299532365, + 'total_files': 190} + ) + ] +) +def test_job_stats_multi(argv, expected, capsys): + # this case tests job_stats with multiple input logs + # and ensures that aggregate statistics are as expected + with mock.patch("sys.argv", argv): + # initialize the parser + parser = argparse.ArgumentParser(description="") + # run through setup_parser() + job_stats.setup_parser(parser=parser) + # parse the input arguments + args = parser.parse_args(argv) + # run once with CSV output and spot check some of the output + job_stats.main(args=args) + captured = capsys.readouterr() + assert not captured.err + assert captured.out + df = pd.read_csv(io.StringIO(captured.out)) + # verify max perf is first row and min perf is last row + max_perf = df['perf_by_slowest'].max() + min_perf = df['perf_by_slowest'].min() + assert df.iloc[0]['perf_by_slowest'] == max_perf + assert df.iloc[-1]['perf_by_slowest'] == min_perf + # verify values against expected + assert_allclose(max_perf, expected['perf_by_slowest'], rtol=1e-5, atol=1e-8) + assert max_perf == expected['perf_by_slowest'] + total_time = df['time_by_slowest'].sum() + assert_allclose(total_time, expected['time_by_slowest'], rtol=1e-5, atol=1e-8) + total_bytes = df['total_bytes'].sum() + assert total_bytes == expected['total_bytes'] + total_files = df['total_files'].sum() + assert total_files == expected['total_files'] + # run again to ensure default Rich print mode runs successfully + args.csv = False + job_stats.main(args=args) + assert not captured.err diff --git a/darshan-util/pydarshan/darshan/tests/test_plot_io_cost.py b/darshan-util/pydarshan/darshan/tests/test_plot_io_cost.py index 280eff298..2a003d987 100644 --- a/darshan-util/pydarshan/darshan/tests/test_plot_io_cost.py +++ b/darshan-util/pydarshan/darshan/tests/test_plot_io_cost.py @@ -315,3 +315,22 @@ def test_plot_io_cost_x_ticks_and_labels(logname, expected_rotations = 90 x_rotations = [tl.get_rotation() for tl in xticklabels] assert_allclose(x_rotations, expected_rotations) + +def test_plot_io_cost_empty_data(): + # generate a report object that filters out all contained records + # to ensure plot_io_cost properly returns None instead of failing + logpath = get_log_path("ior_hdf5_example.darshan") + # use a bogus regex with the "include" filter mode to ensure no records are included + with darshan.DarshanReport(logpath, filter_patterns=["bogus-regex"], filter_mode="include") as report: + fig = plot_io_cost(report=report) + assert fig == None + +def test_plot_io_cost_filtered_data(): + # ensure get_io_cost_df doesn't include data for modules with no records + logpath = get_log_path("sample-badost.darshan") + # generate a report object with all POSIX module records filtered out + # STDIO records should still remain + with darshan.DarshanReport(logpath, filter_patterns=["ior-posix"], filter_mode="exclude") as report: + io_cost_df = get_io_cost_df(report=report) + assert "POSIX" not in io_cost_df.index + assert "STDIO" in io_cost_df.index diff --git a/darshan-util/pydarshan/darshan/tests/test_report.py b/darshan-util/pydarshan/darshan/tests/test_report.py index c32e53421..e1a7eab70 100644 --- a/darshan-util/pydarshan/darshan/tests/test_report.py +++ b/darshan-util/pydarshan/darshan/tests/test_report.py @@ -75,6 +75,15 @@ def test_load_records(): report.mod_read_all_records("POSIX") assert 1 == len(report.data['records']['POSIX']) +def test_load_records_filtered(): + """Sample for an expected number of records after filtering.""" + logfile = get_log_path("shane_macsio_id29959_5-22-32552-7035573431850780836_1590156158.darshan") + with darshan.DarshanReport(logfile, filter_patterns=[r"\.h5$"], filter_mode="exclude") as report: + assert 2 == len(report.data['records']['POSIX']) + assert 0 == len(report.data['records']['MPI-IO']) + with darshan.DarshanReport(logfile, filter_patterns=[r"\.h5$"], filter_mode="include") as report: + assert 1 == len(report.data['records']['POSIX']) + assert 1 == len(report.data['records']['MPI-IO']) @pytest.mark.parametrize("unsupported_record", ["DXT_POSIX", "DXT_MPIIO", "LUSTRE", "APMPI", "APXC"] diff --git a/darshan-util/pydarshan/darshan/tests/test_summary.py b/darshan-util/pydarshan/darshan/tests/test_summary.py index c4e7ee113..f5eb71e8a 100644 --- a/darshan-util/pydarshan/darshan/tests/test_summary.py +++ b/darshan-util/pydarshan/darshan/tests/test_summary.py @@ -176,11 +176,16 @@ def test_main_without_args(tmpdir, argv, expected_img_count, expected_table_coun with pytest.raises(RuntimeError): summary.main() - +# just punt on this test for the dlio_logs given there's 26 of them +def _provide_logs_repo_filepaths_filtered(): + return [ + path for path in _provide_logs_repo_filepaths() + if 'dlio_logs' not in path + ] @pytest.mark.skipif(not pytest.has_log_repo, reason="missing darshan_logs") @pytest.mark.parametrize("log_filepath", - _provide_logs_repo_filepaths() + _provide_logs_repo_filepaths_filtered() ) def test_main_all_logs_repo_files(tmpdir, log_filepath): # similar to `test_main_without_args` but focused diff --git a/darshan-util/pydarshan/docs/usage.rst b/darshan-util/pydarshan/docs/usage.rst index 3d788227b..3d0e8aa31 100644 --- a/darshan-util/pydarshan/docs/usage.rst +++ b/darshan-util/pydarshan/docs/usage.rst @@ -13,7 +13,9 @@ example job summary report can be viewed `HERE