diff --git a/darshan-util/pydarshan/darshan/cli/file_stats.py b/darshan-util/pydarshan/darshan/cli/file_stats.py new file mode 100644 index 000000000..421035ad5 --- /dev/null +++ b/darshan-util/pydarshan/darshan/cli/file_stats.py @@ -0,0 +1,293 @@ +import sys +import pandas as pd +import argparse +from pathlib import Path +import darshan +import darshan.cli +from darshan.backend.cffi_backend import accumulate_records +from typing import Any, Union, Callable +from humanize import naturalsize +import concurrent.futures +from functools import partial + +from rich.console import Console +from rich.table import Table + +def process_logfile(log_path, mod, filter_patterns, filter_mode): + """ + Save relevant file statisitcs from a single Darshan log file to a DataFrame. + + Parameters + ---------- + log_path : a string, the path to a darshan log file. + mod : a string, the module name + filter_patterns: regex patterns for names to exclude/include + filter_mode: whether to "exclude" or "include" the filter patterns + + Returns + ------- + a single DataFrame. + + """ + try: + extra_options = {} + if filter_patterns: + extra_options["filter_patterns"] = filter_patterns + extra_options["filter_mode"] = filter_mode + report = darshan.DarshanReport(log_path, read_all=False) + if mod not in report.modules: + return pd.DataFrame() + report.mod_read_all_records(mod, **extra_options) + if len(report.records[mod]) == 0: + return pd.DataFrame() + recs = report.records[mod].to_df() + if mod != 'MPI-IO': + rec_cols = ['id', f'{mod}_BYTES_READ', f'{mod}_BYTES_WRITTEN', f'{mod}_READS', f'{mod}_WRITES'] + else: + rec_cols = ['id', 'MPIIO_BYTES_READ', 'MPIIO_BYTES_WRITTEN', 'MPIIO_INDEP_READS', 'MPIIO_COLL_READS', 'MPIIO_INDEP_WRITES', 'MPIIO_COLL_WRITES'] + df = recs['counters'][rec_cols].copy() + if mod == 'MPI-IO': + df['MPIIO_READS'] = df['MPIIO_INDEP_READS'] + df['MPIIO_COLL_READS'] + df['MPIIO_WRITES'] = df['MPIIO_INDEP_WRITES'] + df['MPIIO_COLL_WRITES'] + df.drop(columns=['MPIIO_INDEP_READS', 'MPIIO_COLL_READS', 'MPIIO_INDEP_WRITES', 'MPIIO_COLL_WRITES'], inplace=True) + # try to make column names more uniform + new_cols = [] + for col in df.columns: + ndx = col.find('_') + if ndx > 0: + new_cols.append(col[ndx+1:].lower()) + else: + new_cols.append(col) + df.columns = new_cols + df.insert(0, 'file', df['id'].map(report.name_records)) + df.insert(1, 'log_file', log_path) + return df.drop('id', axis=1) # id not needed anymore + except Exception as e: + print(f"Error processing {log_path}: {e}", file=sys.stderr) + return pd.DataFrame() + +def combine_dfs(list_dfs): + """ + Combine per-job DataFrames of each Darshan log to one DataFrame. + + Parameters + ---------- + list_dfs : a list of DataFrames. + + Returns + ------- + a single DataFrame with data from multiple Darshan logs. + + """ + combined_dfs = pd.concat(list_dfs, ignore_index=True) + return combined_dfs + +def group_by_file(combined_dfs): + """ + Group data using the 'file' column. Additionally, calculate the + total number of unique jobs accessing each file. + + Parameters + ---------- + combined_dfs : a DataFrame with data from multiple Darshan logs. + + Returns + ------- + a DataFrame with the sum of each group. + + """ + sum_cols = combined_dfs.select_dtypes('number').columns + # group data by file name, counting number of unique jobs (i.e., log files) + # that access each file, as well as sum total of numerical columns + df_groupby_file = combined_dfs.groupby('file', as_index=False).agg( + **{col: (col, 'sum') for col in sum_cols}, + total_jobs=('log_file', 'nunique') + ) + return df_groupby_file + +def sort_dfs_desc(combined_dfs, order_by): + """ + Sort data by the column name the user inputs in a descending order. + + Parameters + ---------- + combined_dfs : a DataFrame with data from multiple DataFrames. + order_by : a string, the column name + + Returns + ------- + a DataFrame with a descending order of one column. + + """ + combined_dfs_sort = combined_dfs.sort_values(by=[order_by], ascending=False) + return combined_dfs_sort + +def first_n_recs(df, n): + """ + Filter the data to return only the first n records. + + Parameters + ---------- + df : a dataframe + n : an int, number of rows. + + Returns + ------- + a DataFrame with n rows. + + """ + if n >= 0: + return df.head(n) + else: + return df + +def rich_print(df, mod, order_by): + """ + Pretty print the DataFrame using rich tables. + + Parameters + ---------- + df : a dataframe + mod : a string, the module name + order_by : a string, the column name of the statistical metric to sort by + + """ + # calculate totals to plug in to table footer + all_bytes_read = df['bytes_read'].sum() + all_bytes_written = df['bytes_written'].sum() + all_reads = df['reads'].sum() + all_writes = df['writes'].sum() + all_total_jobs = df['total_jobs'].sum() + + console = Console() + table = Table(title=f"Darshan {mod} File Stats", show_lines=True, show_footer=True) + table.add_column("file", f"[u i]TOTAL ({len(df)} files)", justify="center", ratio=5) + default_kwargs = {"justify": "center", "no_wrap": True, "ratio": 1} + table.add_column("bytes_read", f"[u i]{naturalsize(all_bytes_read, binary=True, format='%.2f')}", **default_kwargs) + table.add_column("bytes_written", f"[u i]{naturalsize(all_bytes_written, binary=True, format='%.2f')}", **default_kwargs) + table.add_column("reads", f"[u i]{all_reads}", **default_kwargs) + table.add_column("writes", f"[u i]{all_writes}", **default_kwargs) + table.add_column("total_jobs", f"[u i]{all_total_jobs}", **default_kwargs) + for column in table.columns: + if column.header == order_by: + column.style = column.header_style = column.footer_style = "bold cyan" + for _, row in df.iterrows(): + table.add_row(row["file"], + f"{naturalsize(row['bytes_read'], binary=True, format='%.2f')}", + f"{naturalsize(row['bytes_written'], binary=True, format='%.2f')}", + f"{row['reads']}", + f"{row['writes']}", + f"{row['total_jobs']}") + console.print(table) + +def setup_parser(parser: argparse.ArgumentParser): + """ + Parses the command line arguments. + + Parameters + ---------- + parser : command line argument parser. + + """ + parser.description = "Print statistics describing key metadata and I/O performance metrics for files accessed by a given list of jobs." + + parser.add_argument( + "log_paths", + nargs='*', + help="specify the paths to Darshan log files" + ) + parser.add_argument( + "--log_paths_file", + type=str, + help="specify the path to a manifest file listing Darshan log files" + ) + parser.add_argument( + "--module", "-m", + nargs='?', default='POSIX', + choices=['POSIX', 'MPI-IO', 'STDIO'], + help="specify the Darshan module to generate file stats for (default: %(default)s)" + ) + parser.add_argument( + "--order_by", "-o", + nargs='?', default='bytes_read', + choices=['bytes_read', 'bytes_written', 'reads', 'writes', 'total_jobs'], + help="specify the I/O metric to order files by (default: %(default)s)" + ) + parser.add_argument( + "--limit", "-l", + type=int, + nargs='?', default='-1', + help="limit output to the top LIMIT number of jobs according to selected metric" + ) + parser.add_argument( + "--csv", "-c", + action='store_true', + help="output file stats in CSV format" + ) + parser.add_argument( + "--exclude_names", "-e", + action='append', + help="regex patterns for file record names to exclude in stats" + ) + parser.add_argument( + "--include_names", "-i", + action='append', + help="regex patterns for file record names to include in stats" + ) + +def get_input_logs(args): + if args.log_paths_file: + manifest_path = Path(args.log_paths_file) + if not manifest_path.is_file(): + raise ValueError(f"Input manifest file {manifest_path} not found.") + with open(manifest_path) as f: + return [line.strip() for line in f if line.strip()] + elif args.log_paths: + return args.log_paths + else: + raise ValueError("No input Darshan logs provided.") + +def main(args: Union[Any, None] = None): + """ + Prints file statistics on a set of input Darshan logs. + + Parameters + ---------- + args: command line arguments. + + """ + if args is None: + parser = argparse.ArgumentParser(description="") + setup_parser(parser) + args = parser.parse_args() + mod = args.module + order_by = args.order_by + limit = args.limit + log_paths = get_input_logs(args) + filter_patterns=None + filter_mode=None + if args.exclude_names and args.include_names: + raise ValueError('Only one of --exclude_names and --include_names may be used.') + elif args.exclude_names: + filter_patterns = args.exclude_names + filter_mode = "exclude" + elif args.include_names: + filter_patterns = args.include_names + filter_mode = "include" + process_logfile_with_args = partial(process_logfile, mod=mod, filter_patterns=filter_patterns, filter_mode=filter_mode) + with concurrent.futures.ProcessPoolExecutor() as executor: + results = list(executor.map(process_logfile_with_args, log_paths, chunksize=32)) + list_dfs = [df for df in results if not df.empty] + if len(list_dfs) == 0: + sys.exit() + combined_dfs = combine_dfs(list_dfs) + combined_dfs_grouped = group_by_file(combined_dfs) + combined_dfs_sorted = sort_dfs_desc(combined_dfs_grouped, order_by) + df = first_n_recs(combined_dfs_sorted, limit) + if args.csv: + print(df.to_csv(index=False), end="") + else: + rich_print(df, mod, order_by) + +if __name__ == "__main__": + main() diff --git a/darshan-util/pydarshan/darshan/cli/job_stats.py b/darshan-util/pydarshan/darshan/cli/job_stats.py new file mode 100644 index 000000000..1c27c6886 --- /dev/null +++ b/darshan-util/pydarshan/darshan/cli/job_stats.py @@ -0,0 +1,273 @@ +import sys +import pandas as pd +import argparse +from pathlib import Path +import darshan +import darshan.cli +from darshan.backend.cffi_backend import accumulate_records +from typing import Any, Union, Callable +from datetime import datetime +from humanize import naturalsize +import concurrent.futures +from functools import partial + +from rich.console import Console +from rich.table import Table + +def process_logfile(log_path, mod, filter_patterns, filter_mode): + """ + Save the statistical data from a single Darshan log file to a DataFrame. + + Parameters + ---------- + log_path : a string, the path to a Darshan log file. + mod : a string, the Darshan module name + filter_patterns: regex patterns for names to exclude/include + filter_mode: whether to "exclude" or "include" the filter patterns + + Returns + ------- + a single DataFrame of job statistics. + + """ + try: + extra_options = {} + if filter_patterns: + extra_options["filter_patterns"] = filter_patterns + extra_options["filter_mode"] = filter_mode + report = darshan.DarshanReport(log_path, read_all=False) + if mod not in report.modules: + return pd.DataFrame() + report.mod_read_all_records(mod, **extra_options) + if len(report.records[mod]) == 0: + return pd.DataFrame() + recs = report.records[mod].to_df() + acc_rec = accumulate_records(recs, mod, report.metadata['job']['nprocs']) + dict_acc_rec = {} + dict_acc_rec['log_file'] = log_path.split('/')[-1] + dict_acc_rec['exe'] = report.metadata['exe'] + dict_acc_rec['job_id'] = report.metadata['job']['jobid'] + dict_acc_rec['uid'] = report.metadata['job']['uid'] + dict_acc_rec['nprocs'] = report.metadata['job']['nprocs'] + dict_acc_rec['start_time'] = report.metadata['job']['start_time_sec'] + dict_acc_rec['end_time'] = report.metadata['job']['end_time_sec'] + dict_acc_rec['run_time'] = report.metadata['job']['run_time'] + dict_acc_rec['perf_by_slowest'] = acc_rec.derived_metrics.agg_perf_by_slowest * 1024**2 + dict_acc_rec['time_by_slowest'] = acc_rec.derived_metrics.agg_time_by_slowest + dict_acc_rec['total_bytes'] = acc_rec.derived_metrics.total_bytes + dict_acc_rec['total_files'] = acc_rec.derived_metrics.category_counters[0].count + dict_acc_rec['partial_flag'] = report.modules[mod]['partial_flag'] + df = pd.DataFrame.from_dict([dict_acc_rec]) + return df + except Exception as e: + print(f"Error processing {log_path}: {e}", file=sys.stderr) + return pd.DataFrame() + +def combine_dfs(list_dfs): + """ + Combine per-job DataFrames of each Darshan log into one DataFrame. + + Parameters + ---------- + list_dfs : a list of DataFrames. + + Returns + ------- + a single DataFrame with data from multiple Darshan logs. + + """ + combined_dfs = pd.concat(list_dfs, ignore_index=True) + return combined_dfs + +def sort_dfs_desc(combined_dfs, order_by): + """ + Sort data by the column name the user inputs in a descending order. + + Parameters + ---------- + combined_dfs : a DataFrame with data from multiple Darshan logs. + order_by : a string, the column name of the statistical metric to sort by. + + Returns + ------- + a DataFrame sorted in descending order by a given column. + + """ + combined_dfs_sorted = combined_dfs.sort_values(by=[order_by], ascending=False) + return combined_dfs_sorted + +def first_n_recs(df, n): + """ + Filter the data to return only the first n records. + + Parameters + ---------- + df : a dataframe + n : an int, number of rows. + + Returns + ------- + a DataFrame with n rows. + + """ + if n >= 0: + return df.head(n) + else: + return df + +def rich_print(df, mod, order_by): + """ + Pretty print the DataFrame using rich tables. + + Parameters + ---------- + df : a dataframe + mod : a string, the Darshan module name + order_by : a string, the column name of the statistical metric to sort by + + """ + # calculate totals to plug in to table footer + all_time_by_slowest = df['time_by_slowest'].sum() + all_total_bytes = df['total_bytes'].sum() + all_total_files = df['total_files'].sum() + all_perf_by_slowest = all_total_bytes / all_time_by_slowest + + # instantiate a rich table and pretty print the dataframe + console = Console() + table = Table(title=f"Darshan {mod} Job Stats", show_lines=True, show_footer=True) + table.add_column("job", f"[u i]TOTAL ({len(df)} jobs)", justify="center", ratio=4) + default_kwargs = {"justify": "center", "no_wrap": True, "ratio": 1} + table.add_column("perf_by_slowest", f"[u i]{naturalsize(all_perf_by_slowest, binary=True, format='%.2f')}/s", **default_kwargs) + table.add_column("time_by_slowest", f"[u i]{all_time_by_slowest:.2f} s", **default_kwargs) + table.add_column("total_bytes", f"[u i]{naturalsize(all_total_bytes, binary=True, format='%.2f')}", **default_kwargs) + table.add_column("total_files", f"[u i]{all_total_files}", **default_kwargs) + for column in table.columns: + if column.header == order_by: + column.style = column.header_style = column.footer_style = "bold cyan" + for _, row in df.iterrows(): + job_str = f"[bold]job id[/bold]: {row['job_id']}\n" + job_str += f"[bold]uid[/bold]: {row['uid']}\n" + job_str += f"[bold]nprocs[/bold]: {row['nprocs']}\n" + job_str += f"[bold]start time[/bold]: {datetime.fromtimestamp(row['start_time']).strftime('%m/%d/%Y %H:%M:%S')}\n" + job_str += f"[bold]end time[/bold]: {datetime.fromtimestamp(row['end_time']).strftime('%m/%d/%Y %H:%M:%S')}\n" + job_str += f"[bold]runtime[/bold]: {row['run_time']:.2f} s\n" + job_str += f"[bold]exe[/bold]: {row['exe']}\n" + job_str += f"[bold]log file[/bold]: {row['log_file']}" + table.add_row(job_str, + f"{naturalsize(row['perf_by_slowest'], binary=True, format='%.2f')}/s", + f"{row['time_by_slowest']:.2f} s", + f"{naturalsize(row['total_bytes'], binary=True, format='%.2f')}", + f"{row['total_files']}") + console.print(table) + +def setup_parser(parser: argparse.ArgumentParser): + """ + Parses the command line arguments. + + Parameters + ---------- + parser : command line argument parser. + + """ + parser.description = "Print statistics describing key metadata and I/O performance metrics for a given list of jobs." + + parser.add_argument( + "log_paths", + nargs='*', + help="specify the paths to Darshan log files" + ) + parser.add_argument( + "--log_paths_file", + type=str, + help="specify the path to a manifest file listing Darshan log files" + ) + parser.add_argument( + "--module", "-m", + nargs='?', default='POSIX', + choices=['POSIX', 'MPI-IO', 'STDIO'], + help="specify the Darshan module to generate job stats for (default: %(default)s)" + ) + parser.add_argument( + "--order_by", "-o", + nargs='?', default='total_bytes', + choices=['perf_by_slowest', 'time_by_slowest', 'total_bytes', 'total_files'], + help="specify the I/O metric to order jobs by (default: %(default)s)" + ) + parser.add_argument( + "--limit", "-l", + type=int, + nargs='?', default='-1', + help="limit output to the top LIMIT number of jobs according to selected metric" + ) + parser.add_argument( + "--csv", "-c", + action='store_true', + help="output job stats in CSV format" + ) + parser.add_argument( + "--exclude_names", "-e", + action='append', + help="regex patterns for file record names to exclude in stats" + ) + parser.add_argument( + "--include_names", "-i", + action='append', + help="regex patterns for file record names to include in stats" + ) + +def get_input_logs(args): + if args.log_paths_file: + manifest_path = Path(args.log_paths_file) + if not manifest_path.is_file(): + raise ValueError(f"Input manifest file {manifest_path} not found.") + with open(manifest_path) as f: + return [line.strip() for line in f if line.strip()] + elif args.log_paths: + return args.log_paths + else: + raise ValueError("No input Darshan logs provided.") + +def main(args: Union[Any, None] = None): + """ + Prints job statistics on a set of input Darshan logs. + + Parameters + ---------- + args: command line arguments. + + """ + if args is None: + parser = argparse.ArgumentParser(description="") + setup_parser(parser) + args = parser.parse_args() + mod = args.module + order_by = args.order_by + limit = args.limit + log_paths = get_input_logs(args) + filter_patterns=None + filter_mode=None + if args.exclude_names and args.include_names: + raise ValueError('Only one of --exclude_names and --include_names may be used.') + elif args.exclude_names: + filter_patterns = args.exclude_names + filter_mode = "exclude" + elif args.include_names: + filter_patterns = args.include_names + filter_mode = "include" + process_logfile_with_args = partial(process_logfile, mod=mod, filter_patterns=filter_patterns, filter_mode=filter_mode) + with concurrent.futures.ProcessPoolExecutor() as executor: + results = list(executor.map(process_logfile_with_args, log_paths, chunksize=32)) + list_dfs = [df for df in results if not df.empty] + if len(list_dfs) == 0: + sys.exit() + combined_dfs = combine_dfs(list_dfs) + combined_dfs_sorted = sort_dfs_desc(combined_dfs, order_by) + df = first_n_recs(combined_dfs_sorted, limit) + if args.csv: + df = df.drop("exe", axis=1) + print(df.to_csv(index=False), end="") + else: + rich_print(df, mod, order_by) + +if __name__ == "__main__": + main() diff --git a/darshan-util/pydarshan/darshan/tests/test_file_stats.py b/darshan-util/pydarshan/darshan/tests/test_file_stats.py new file mode 100644 index 000000000..9c87fe657 --- /dev/null +++ b/darshan-util/pydarshan/darshan/tests/test_file_stats.py @@ -0,0 +1,136 @@ +import argparse +from unittest import mock +from darshan.log_utils import get_log_path +from darshan.cli import file_stats +from darshan.log_utils import _provide_logs_repo_filepaths +import pandas as pd +import io +import pytest + +@pytest.mark.parametrize( + "argv", [ + ["--csv", + "--module=POSIX", + "--order_by=bytes_written", + get_log_path("shane_macsio_id29959_5-22-32552-7035573431850780836_1590156158.darshan")], + ] +) +def test_file_stats(argv, capsys): + with mock.patch("sys.argv", argv): + # initialize the parser + parser = argparse.ArgumentParser(description="") + # run through setup_parser() + file_stats.setup_parser(parser=parser) + # parse the input arguments + args = parser.parse_args(argv) + # run once with CSV output and spot check some of the output + file_stats.main(args=args) + captured = capsys.readouterr() + assert not captured.err + assert captured.out + df = pd.read_csv(io.StringIO(captured.out)) + assert len(df) == 3 + # check the first file (most bytes written) + expected_first = { + 'file': '/tmp/test/macsio_hdf5_000.h5', + 'bytes_read': 39816960, + 'bytes_written': 54579416, + 'reads': 6, + 'writes': 7699, + 'total_jobs': 1 + } + row = df.iloc[0] + for key, value in expected_first.items(): + assert row[key] == value + # check the last file (least bytes written) + expected_last = { + 'file': '/tmp/test/macsio-timings.log', + 'bytes_read': 0, + 'bytes_written': 12460, + 'reads': 0, + 'writes': 51, + 'total_jobs': 1 + } + row = df.iloc[-1] + for key, value in expected_last.items(): + assert row[key] == value + assert expected_first['bytes_written'] > expected_last['bytes_written'] + # run again to ensure default Rich print mode runs successfully + args.csv = False + file_stats.main(args=args) + assert not captured.err + +def _provide_logs_repo_filepaths_filtered(): + return [ + path for path in _provide_logs_repo_filepaths() + if 'dlio_logs' in path + ] +@pytest.mark.skipif(not pytest.has_log_repo, + reason="missing darshan_logs") +@pytest.mark.parametrize( + ("argv", "expected"), + [ + ( + ["--csv", + "--module=POSIX", + "--order_by=bytes_read", + *_provide_logs_repo_filepaths_filtered()], + {'len': 194, + 'bytes_read': 129953991223, + 'bytes_written': 523946754, + 'reads': 35762, + 'writes': 168, + 'total_jobs': 670} + ), + ( + ["--csv", + "--module=POSIX", + "--order_by=bytes_read", + "--limit=5", + *_provide_logs_repo_filepaths_filtered()], + {'len': 5, + 'bytes_read': 7214542900, + 'bytes_written': 0, + 'reads': 1830, + 'writes': 0, + 'total_jobs': 5} + ), + ( + ["--csv", + "--module=POSIX", + "--order_by=bytes_read", + "--include_names=\\.npz$", + *_provide_logs_repo_filepaths_filtered()], + {'len': 168, + 'bytes_read': 129953701195, + 'bytes_written': 0, + 'reads': 34770, + 'writes': 0, + 'total_jobs': 172} + ) + ] +) +def test_file_stats_multi(argv, expected, capsys): + with mock.patch("sys.argv", argv): + # initialize the parser + parser = argparse.ArgumentParser(description="") + # run through setup_parser() + file_stats.setup_parser(parser=parser) + # parse the input arguments + args = parser.parse_args(argv) + # run once with CSV output and spot check some of the output + file_stats.main(args=args) + captured = capsys.readouterr() + assert not captured.err + assert captured.out + df = pd.read_csv(io.StringIO(captured.out)) + assert len(df) == expected['len'] + assert df['bytes_read'].sum() == expected['bytes_read'] + assert df['bytes_written'].sum() == expected['bytes_written'] + assert df['reads'].sum() == expected['reads'] + assert df['writes'].sum() == expected['writes'] + assert df['total_jobs'].sum() == expected['total_jobs'] + # run again to ensure default Rich print mode runs successfully + args.csv = False + file_stats.main(args=args) + assert not captured.err diff --git a/darshan-util/pydarshan/darshan/tests/test_job_stats.py b/darshan-util/pydarshan/darshan/tests/test_job_stats.py new file mode 100644 index 000000000..010647558 --- /dev/null +++ b/darshan-util/pydarshan/darshan/tests/test_job_stats.py @@ -0,0 +1,122 @@ +import argparse +from unittest import mock +from darshan.log_utils import get_log_path +from darshan.cli import job_stats +from darshan.log_utils import _provide_logs_repo_filepaths +from numpy.testing import assert_allclose +import pandas as pd +import io +import pytest + +@pytest.mark.parametrize( + "argv", [ + ["--csv", + "--module=STDIO", + "--order_by=total_bytes", + get_log_path("sample-badost.darshan")], + ] +) +def test_job_stats(argv, capsys): + with mock.patch("sys.argv", argv): + # initialize the parser + parser = argparse.ArgumentParser(description="") + # run through setup_parser() + job_stats.setup_parser(parser=parser) + # parse the input arguments + args = parser.parse_args(argv) + # run once with CSV output and spot check some of the output + job_stats.main(args=args) + captured = capsys.readouterr() + assert not captured.err + assert captured.out + df = pd.read_csv(io.StringIO(captured.out)) + assert len(df) == 1 + expected = { + 'log_file': 'sample-badost.darshan', + 'job_id': 6265799, + 'nprocs': 2048, + 'run_time': 780.0, + 'perf_by_slowest': 8.249708e+06, + 'time_by_slowest': 0.200828, + 'total_bytes': 1656773, + 'total_files': 3, + 'partial_flag': False + } + row = df.iloc[0] + for key, value in expected.items(): + if key == 'perf_by_slowest' or key == 'time_by_slowest': + assert_allclose(row[key], value, rtol=1e-5, atol=1e-8) + else: + assert row[key] == value + # run again to ensure default Rich print mode runs successfully + args.csv = False + job_stats.main(args=args) + assert not captured.err + +def _provide_logs_repo_filepaths_filtered(): + return [ + path for path in _provide_logs_repo_filepaths() + if 'dlio_logs' in path + ] +@pytest.mark.skipif(not pytest.has_log_repo, + reason="missing darshan_logs") +@pytest.mark.parametrize( + ("argv", "expected"), + [ + ( + ["--csv", + "--module=POSIX", + "--order_by=perf_by_slowest", + *_provide_logs_repo_filepaths_filtered()], + {'perf_by_slowest': 1818543162.0558, + 'time_by_slowest': 89.185973, + 'total_bytes': 130477937977, + 'total_files': 670} + ), + ( + ["--csv", + "--module=POSIX", + "--order_by=perf_by_slowest", + "--limit=5", + *_provide_logs_repo_filepaths_filtered()], + {'perf_by_slowest': 1818543162.0558, + 'time_by_slowest': 30.823626, + 'total_bytes': 54299532365, + 'total_files': 190} + ) + ] +) +def test_job_stats_multi(argv, expected, capsys): + # this case tests job_stats with multiple input logs + # and ensures that aggregate statistics are as expected + with mock.patch("sys.argv", argv): + # initialize the parser + parser = argparse.ArgumentParser(description="") + # run through setup_parser() + job_stats.setup_parser(parser=parser) + # parse the input arguments + args = parser.parse_args(argv) + # run once with CSV output and spot check some of the output + job_stats.main(args=args) + captured = capsys.readouterr() + assert not captured.err + assert captured.out + df = pd.read_csv(io.StringIO(captured.out)) + # verify max perf is first row and min perf is last row + max_perf = df['perf_by_slowest'].max() + min_perf = df['perf_by_slowest'].min() + assert df.iloc[0]['perf_by_slowest'] == max_perf + assert df.iloc[-1]['perf_by_slowest'] == min_perf + # verify values against expected + assert_allclose(max_perf, expected['perf_by_slowest'], rtol=1e-5, atol=1e-8) + assert max_perf == expected['perf_by_slowest'] + total_time = df['time_by_slowest'].sum() + assert_allclose(total_time, expected['time_by_slowest'], rtol=1e-5, atol=1e-8) + total_bytes = df['total_bytes'].sum() + assert total_bytes == expected['total_bytes'] + total_files = df['total_files'].sum() + assert total_files == expected['total_files'] + # run again to ensure default Rich print mode runs successfully + args.csv = False + job_stats.main(args=args) + assert not captured.err diff --git a/darshan-util/pydarshan/darshan/tests/test_report.py b/darshan-util/pydarshan/darshan/tests/test_report.py index a5d18df09..e1a7eab70 100644 --- a/darshan-util/pydarshan/darshan/tests/test_report.py +++ b/darshan-util/pydarshan/darshan/tests/test_report.py @@ -78,10 +78,10 @@ def test_load_records(): def test_load_records_filtered(): """Sample for an expected number of records after filtering.""" logfile = get_log_path("shane_macsio_id29959_5-22-32552-7035573431850780836_1590156158.darshan") - with darshan.DarshanReport(logfile, filter_patterns=["\.h5$"], filter_mode="exclude") as report: + with darshan.DarshanReport(logfile, filter_patterns=[r"\.h5$"], filter_mode="exclude") as report: assert 2 == len(report.data['records']['POSIX']) assert 0 == len(report.data['records']['MPI-IO']) - with darshan.DarshanReport(logfile, filter_patterns=["\.h5$"], filter_mode="include") as report: + with darshan.DarshanReport(logfile, filter_patterns=[r"\.h5$"], filter_mode="include") as report: assert 1 == len(report.data['records']['POSIX']) assert 1 == len(report.data['records']['MPI-IO']) diff --git a/darshan-util/pydarshan/darshan/tests/test_summary.py b/darshan-util/pydarshan/darshan/tests/test_summary.py index c4e7ee113..f5eb71e8a 100644 --- a/darshan-util/pydarshan/darshan/tests/test_summary.py +++ b/darshan-util/pydarshan/darshan/tests/test_summary.py @@ -176,11 +176,16 @@ def test_main_without_args(tmpdir, argv, expected_img_count, expected_table_coun with pytest.raises(RuntimeError): summary.main() - +# just punt on this test for the dlio_logs given there's 26 of them +def _provide_logs_repo_filepaths_filtered(): + return [ + path for path in _provide_logs_repo_filepaths() + if 'dlio_logs' not in path + ] @pytest.mark.skipif(not pytest.has_log_repo, reason="missing darshan_logs") @pytest.mark.parametrize("log_filepath", - _provide_logs_repo_filepaths() + _provide_logs_repo_filepaths_filtered() ) def test_main_all_logs_repo_files(tmpdir, log_filepath): # similar to `test_main_without_args` but focused diff --git a/darshan-util/pydarshan/docs/usage.rst b/darshan-util/pydarshan/docs/usage.rst index 3d788227b..3d0e8aa31 100644 --- a/darshan-util/pydarshan/docs/usage.rst +++ b/darshan-util/pydarshan/docs/usage.rst @@ -13,7 +13,9 @@ example job summary report can be viewed `HERE