From 6684c4d7d1229a46bc1bb1f94e8dcf6d7db7c5ed Mon Sep 17 00:00:00 2001 From: Yanli Date: Tue, 18 Jul 2023 08:25:27 -0600 Subject: [PATCH 1/4] Log-based sorting --- .../pydarshan/darshan/cli/job_stats.py | 170 ++++++++++++++++++ 1 file changed, 170 insertions(+) create mode 100644 darshan-util/pydarshan/darshan/cli/job_stats.py diff --git a/darshan-util/pydarshan/darshan/cli/job_stats.py b/darshan-util/pydarshan/darshan/cli/job_stats.py new file mode 100644 index 000000000..e970d5e7e --- /dev/null +++ b/darshan-util/pydarshan/darshan/cli/job_stats.py @@ -0,0 +1,170 @@ +import pandas as pd +import argparse +import darshan +import darshan.cli +from darshan.backend.cffi_backend import accumulate_records +from typing import Any, Union, Callable +import glob + +def df_IO_data(file_path, mod): + """ + Save the data from a single log file to a DataFrame. + + Parameters + ---------- + file_path : a string, the path to a darshan log file. + mod : a string, the module name + + Returns + ------- + a single DataFrame. + + """ + report = darshan.DarshanReport(file_path, read_all=True) + posix_recs = report.records[mod].to_df() + acc_recs = accumulate_records(posix_recs, mod, report.metadata['job']['nprocs']) + dict_recs = {} + dict_recs['agg_perf_by_slowest'] = acc_recs.derived_metrics.agg_perf_by_slowest + dict_recs['agg_time_by_slowest'] = acc_recs.derived_metrics.agg_time_by_slowest + dict_recs['category_counters'] = acc_recs.derived_metrics.category_counters + dict_recs['shared_io_total_time_by_slowest'] = acc_recs.derived_metrics.shared_io_total_time_by_slowest + dict_recs['total_bytes'] = acc_recs.derived_metrics.total_bytes + dict_recs['unique_io_slowest_rank'] = acc_recs.derived_metrics.unique_io_slowest_rank + dict_recs['unique_io_total_time_by_slowest'] = acc_recs.derived_metrics.unique_io_total_time_by_slowest + dict_recs['unique_md_only_time_by_slowest'] = acc_recs.derived_metrics.unique_md_only_time_by_slowest + dict_recs['unique_rw_only_time_by_slowest'] = acc_recs.derived_metrics.unique_rw_only_time_by_slowest + df = pd.DataFrame.from_dict([dict_recs]) + return df + +def combined_dfs(list_dfs): + """ + Combine DataFrames of each log files to one DataFrame. + + Parameters + ---------- + list_dfs : a list of DataFrames. + + Returns + ------- + a single DataFrame with data from multiple DataFrames. + + """ + combined_dfs = pd.concat(list_dfs, ignore_index = True) + return combined_dfs + +def sort_data_desc(combined_dfs, order_by_colname): + """ + Sort data by the column name the user inputs in a descending order. + + Parameters + ---------- + combined_dfs : a DataFrame with data from multiple DataFrames. + order_by_colname : a string, the column name + + Returns + ------- + a DataFrame with a descending order of one column. + + """ + combined_dfs_sort = combined_dfs.sort_values(by=[order_by_colname], ascending=False) + return combined_dfs_sort + +def first_n_recs(df, n): + """ + Filtering the data with the first n records. + + Parameters + ---------- + df : a dataframe + n : an int, number of rows. + + Returns + ------- + a DataFrame with n rows. + + """ + + combined_dfs_first_n = df.head(n) + return combined_dfs_first_n + +def setup_parser(parser: argparse.ArgumentParser): + """ + Configures the command line arguments. + + Parameters + ---------- + parser : command line argument parser. + + """ + parser.description = "Generates a DataFrame with statistical data of n jobs for a certain module" + + parser.add_argument( + "log_path", + type=str, + help="Specify the path to darshan log files." + ) + parser.add_argument( + "module", + type=str, + help="Specify the module name." + ) + parser.add_argument( + "order_by_colname", + type=str, + help="Specify the column name." + ) + parser.add_argument( + "number_of_rows", + type=int, + help="The first n rows of the DataFrame" + ) + +def discover_logpaths(user_path): + """ + Generate a list with all of the log file paths. + + Parameters + ---------- + user_path : a string, user input for the file path + + Returns + ------- + a list with paths of log files. + + """ + paths = glob.glob(user_path + "worker*.darshan") + return paths + +def main(args: Union[Any, None] = None): + """ + Generates a DataFrame based on user's input. + + Parameters + ---------- + args: command line arguments. + + """ + if args is None: + parser = argparse.ArgumentParser(description="") + setup_parser(parser) + args = parser.parse_args() + mod = args.module + order_by_colname = args.order_by_colname + n = args.number_of_rows + colname_list = ['agg_perf_by_slowest', 'agg_time_by_slowest', 'total_bytes'] + if order_by_colname in colname_list: + log_paths = discover_logpaths(args.log_path) + item_number = len(log_paths) + list_dfs = [] + for i in range(item_number): + df_i = df_IO_data(log_paths[i], mod) + list_dfs.append(df_i) + com_dfs = combined_dfs(list_dfs) + combined_dfs_sort = sort_data_desc(com_dfs, order_by_colname) + combined_dfs_selected = first_n_recs(combined_dfs_sort, n) + print("Statistical data of jobs:", combined_dfs_selected) + else: + print("Column name should be 'agg_perf_by_slowest', 'agg_time_by_slowest', or 'total_bytes'") + +if __name__ == "__main__": + main() From 06f231b0a9ef6ec26242ca388e299c49b11c9885 Mon Sep 17 00:00:00 2001 From: Yanli Date: Fri, 21 Jul 2023 08:34:35 -0600 Subject: [PATCH 2/4] Use a path glob pattern from the user input --- darshan-util/pydarshan/darshan/cli/job_stats.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/darshan-util/pydarshan/darshan/cli/job_stats.py b/darshan-util/pydarshan/darshan/cli/job_stats.py index e970d5e7e..8fcbadddf 100644 --- a/darshan-util/pydarshan/darshan/cli/job_stats.py +++ b/darshan-util/pydarshan/darshan/cli/job_stats.py @@ -119,20 +119,20 @@ def setup_parser(parser: argparse.ArgumentParser): help="The first n rows of the DataFrame" ) -def discover_logpaths(user_path): +def discover_logpaths(user_path_glob): """ - Generate a list with all of the log file paths. + Generate a list with log file paths. Parameters ---------- - user_path : a string, user input for the file path + user_path_glob : a string, a path glob pattern from the user input Returns ------- a list with paths of log files. """ - paths = glob.glob(user_path + "worker*.darshan") + paths = glob.glob(user_path_glob) return paths def main(args: Union[Any, None] = None): From 4e7266f8575ccb59bb8c9bc46375dd98acfeebe2 Mon Sep 17 00:00:00 2001 From: Yanli Date: Fri, 4 Aug 2023 15:02:32 -0600 Subject: [PATCH 3/4] Test for job_stats.py --- .../pydarshan/darshan/tests/test_job_stats.py | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 darshan-util/pydarshan/darshan/tests/test_job_stats.py diff --git a/darshan-util/pydarshan/darshan/tests/test_job_stats.py b/darshan-util/pydarshan/darshan/tests/test_job_stats.py new file mode 100644 index 000000000..213b7fc0b --- /dev/null +++ b/darshan-util/pydarshan/darshan/tests/test_job_stats.py @@ -0,0 +1,24 @@ +import argparse +from unittest import mock +from darshan.log_utils import get_log_path +from darshan.cli import job_stats +import pytest +@pytest.mark.parametrize( + "argv", [ + [get_log_path("e3sm_io_heatmap_only.darshan"), + "-mSTDIO", + "-ototal_bytes", + "-n5"], + ] +) +def test_job_stats(argv, capsys): + with mock.patch("sys.argv", argv): + # initialize the parser + parser = argparse.ArgumentParser(description="") + # run through setup_parser() + job_stats.setup_parser(parser=parser) + # parse the input arguments + args = parser.parse_args(argv) + job_stats.main(args=args) + captured = capsys.readouterr() + assert "3.258853" in captured.out From f449f2270d2a6d3f4bca3c65861781f983909a53 Mon Sep 17 00:00:00 2001 From: Yanli Date: Fri, 4 Aug 2023 15:19:09 -0600 Subject: [PATCH 4/4] Improve codes for command line arguments --- .../pydarshan/darshan/cli/job_stats.py | 62 +++++++------------ 1 file changed, 23 insertions(+), 39 deletions(-) diff --git a/darshan-util/pydarshan/darshan/cli/job_stats.py b/darshan-util/pydarshan/darshan/cli/job_stats.py index 8fcbadddf..6dbebfe69 100644 --- a/darshan-util/pydarshan/darshan/cli/job_stats.py +++ b/darshan-util/pydarshan/darshan/cli/job_stats.py @@ -1,5 +1,6 @@ import pandas as pd import argparse +import sys import darshan import darshan.cli from darshan.backend.cffi_backend import accumulate_records @@ -26,13 +27,7 @@ def df_IO_data(file_path, mod): dict_recs = {} dict_recs['agg_perf_by_slowest'] = acc_recs.derived_metrics.agg_perf_by_slowest dict_recs['agg_time_by_slowest'] = acc_recs.derived_metrics.agg_time_by_slowest - dict_recs['category_counters'] = acc_recs.derived_metrics.category_counters - dict_recs['shared_io_total_time_by_slowest'] = acc_recs.derived_metrics.shared_io_total_time_by_slowest dict_recs['total_bytes'] = acc_recs.derived_metrics.total_bytes - dict_recs['unique_io_slowest_rank'] = acc_recs.derived_metrics.unique_io_slowest_rank - dict_recs['unique_io_total_time_by_slowest'] = acc_recs.derived_metrics.unique_io_total_time_by_slowest - dict_recs['unique_md_only_time_by_slowest'] = acc_recs.derived_metrics.unique_md_only_time_by_slowest - dict_recs['unique_rw_only_time_by_slowest'] = acc_recs.derived_metrics.unique_rw_only_time_by_slowest df = pd.DataFrame.from_dict([dict_recs]) return df @@ -101,40 +96,29 @@ def setup_parser(parser: argparse.ArgumentParser): parser.add_argument( "log_path", type=str, + nargs='+', help="Specify the path to darshan log files." ) parser.add_argument( - "module", + "-module","-m", type=str, + nargs='?', default='POSIX', help="Specify the module name." ) parser.add_argument( - "order_by_colname", + "-order_by_colname", "-o", type=str, + nargs='?', default='total_bytes', + choices=['agg_perf_by_slowest', 'agg_time_by_slowest', 'total_bytes'], help="Specify the column name." ) parser.add_argument( - "number_of_rows", + "-number_of_rows", "-n", type=int, + nargs='?', default='10', help="The first n rows of the DataFrame" ) -def discover_logpaths(user_path_glob): - """ - Generate a list with log file paths. - - Parameters - ---------- - user_path_glob : a string, a path glob pattern from the user input - - Returns - ------- - a list with paths of log files. - - """ - paths = glob.glob(user_path_glob) - return paths - def main(args: Union[Any, None] = None): """ Generates a DataFrame based on user's input. @@ -149,22 +133,22 @@ def main(args: Union[Any, None] = None): setup_parser(parser) args = parser.parse_args() mod = args.module + list_modules = ["POSIX", "MPI-IO", "LUSTRE", "STDIO"] + if mod not in list_modules: + print(f'{mod} is not in list') + sys.exit() order_by_colname = args.order_by_colname n = args.number_of_rows - colname_list = ['agg_perf_by_slowest', 'agg_time_by_slowest', 'total_bytes'] - if order_by_colname in colname_list: - log_paths = discover_logpaths(args.log_path) - item_number = len(log_paths) - list_dfs = [] - for i in range(item_number): - df_i = df_IO_data(log_paths[i], mod) - list_dfs.append(df_i) - com_dfs = combined_dfs(list_dfs) - combined_dfs_sort = sort_data_desc(com_dfs, order_by_colname) - combined_dfs_selected = first_n_recs(combined_dfs_sort, n) - print("Statistical data of jobs:", combined_dfs_selected) - else: - print("Column name should be 'agg_perf_by_slowest', 'agg_time_by_slowest', or 'total_bytes'") + log_paths = args.log_path + item_number = len(log_paths) + list_dfs = [] + for i in range(item_number): + df_i = df_IO_data(log_paths[i], mod) + list_dfs.append(df_i) + com_dfs = combined_dfs(list_dfs) + combined_dfs_sort = sort_data_desc(com_dfs, order_by_colname) + combined_dfs_selected = first_n_recs(combined_dfs_sort, n) + print("Statistical data of jobs:\n", combined_dfs_selected) if __name__ == "__main__": main()