|
41 | 41 | #@HEADER |
42 | 42 | # |
43 | 43 | """ |
44 | | -Utility to generate a yaml containing lists of tasks associated to their respective ranks, |
| 44 | +Utility to generate a YAML file containing lists of tasks associated with their respective ranks, |
45 | 45 | from the last phase and last sub-iteration of input JSON files. |
46 | | -
|
47 | 46 | """ |
48 | 47 |
|
49 | 48 | import os |
|
53 | 52 |
|
54 | 53 | from lbaf.IO.lbsVTDataReader import LoadReader |
55 | 54 | from lbaf.Utils.lbsLogging import get_logger, Logger |
| 55 | +from typing import Optional |
56 | 56 |
|
57 | 57 | class JSONTaskLister: |
| 58 | + """ |
| 59 | + A utility class to process JSON files, extract tasks for each rank, and save the results in a YAML file. |
| 60 | + """ |
| 61 | + |
| 62 | + def __init__(self, logger: Optional[Logger] = None): |
| 63 | + """ |
| 64 | + Initializes an instance of the JSONTaskLister class. |
| 65 | +
|
| 66 | + Args: |
| 67 | + logger (Optional[Logger]): A logger instance for logging messages. If not provided, a default logger is used. |
| 68 | + """ |
| 69 | + self.__logger = logger if logger is not None else get_logger() |
| 70 | + self.__directory = "" # Directory containing the input JSON files |
| 71 | + self.__file_stem = "data" # Default file stem for JSON files |
| 72 | + self.__file_suffix = "json" # Default file suffix for JSON files |
| 73 | + self.__output_file = "tasks.yaml" # Default name of the output YAML file |
| 74 | + |
58 | 75 | def __process_files(self): |
| 76 | + """ |
| 77 | + Processes the JSON files in the specified directory to extract tasks for each rank. |
| 78 | +
|
| 79 | + Returns: |
| 80 | + dict: A dictionary where keys are ranks and values are lists of tasks. |
| 81 | + """ |
| 82 | + # Initialize the JSON data reader |
59 | 83 | reader = LoadReader( |
60 | | - file_prefix = self.__directory + self.__file_stem, |
61 | | - logger = self.__logger, |
62 | | - file_suffix = self.__file_suffix |
| 84 | + file_prefix=self.__directory + self.__file_stem, |
| 85 | + logger=self.__logger, |
| 86 | + file_suffix=self.__file_suffix |
63 | 87 | ) |
64 | 88 |
|
65 | | - tasks = {} |
66 | | - n_ranks = reader.n_ranks |
| 89 | + tasks = {} # Dictionary to store tasks by rank |
| 90 | + n_ranks = reader.n_ranks # Get the total number of ranks |
67 | 91 |
|
68 | 92 | try: |
| 93 | + # Iterate over each rank |
69 | 94 | for rank in range(n_ranks): |
70 | | - _, data = reader._load_vt_file(rank) |
71 | | - phases = data.get("phases", []) |
| 95 | + _, data = reader._load_vt_file(rank) # Load JSON data for the current rank |
| 96 | + phases = data.get("phases", []) # Extract phases from the data |
| 97 | + |
72 | 98 | if not phases: |
73 | 99 | self.__logger.warning(f"No phases found for rank {rank}") |
74 | 100 | continue |
75 | 101 |
|
76 | | - last_phase = phases[-1] |
| 102 | + last_phase = phases[-1] # Get the last phase |
77 | 103 |
|
| 104 | + # Check if there are load balancing iterations in the last phase |
78 | 105 | if "lb_iterations" in last_phase: |
79 | 106 | lb_iterations = last_phase["lb_iterations"] |
| 107 | + |
80 | 108 | if lb_iterations: |
| 109 | + # Extract tasks from the last load balancing iteration |
81 | 110 | last_lb_iteration = lb_iterations[-1] |
82 | | - iteration_tasks = [task["entity"].get("seq_id", task["entity"].get("id")) for task in last_lb_iteration.get("tasks", [])] |
| 111 | + iteration_tasks = [ |
| 112 | + task["entity"].get("seq_id", task["entity"].get("id")) |
| 113 | + for task in last_lb_iteration.get("tasks", []) |
| 114 | + ] |
83 | 115 | tasks[rank] = iteration_tasks |
84 | 116 | else: |
85 | 117 | self.__logger.warning(f"No lb_iterations found in the last phase of rank {rank}") |
86 | 118 | else: |
87 | | - phase_tasks = [task["entity"].get("seq_id", task["entity"].get("id")) for task in last_phase.get("tasks", [])] |
| 119 | + # Extract tasks directly from the last phase if no lb_iterations exist |
| 120 | + phase_tasks = [ |
| 121 | + task["entity"].get("seq_id", task["entity"].get("id")) |
| 122 | + for task in last_phase.get("tasks", []) |
| 123 | + ] |
88 | 124 | tasks[rank] = phase_tasks |
| 125 | + |
89 | 126 | except (json.JSONDecodeError, KeyError, ValueError, IndexError) as e: |
90 | 127 | self.__logger.error(f"Error processing rank {rank}: {e}") |
91 | 128 | return |
92 | 129 |
|
93 | 130 | return tasks |
94 | 131 |
|
95 | 132 | def run(self): |
| 133 | + """ |
| 134 | + Main entry point for the JSONTaskLister utility. Parses command-line arguments, |
| 135 | + processes JSON files, and writes the extracted tasks to a YAML file. |
| 136 | + """ |
| 137 | + # Parse command-line arguments |
96 | 138 | parser = argparse.ArgumentParser(description="Extract tasks from JSON files.") |
97 | 139 | parser.add_argument("directory", type=str, help="Directory containing JSON files.") |
98 | 140 | parser.add_argument("--file-stem", type=str, default="data", help="File stem for JSON files (default: 'data').") |
99 | 141 | parser.add_argument("--file-suffix", type=str, default="json", help="File suffix for JSON files (default: 'json').") |
100 | | - parser.add_argument("--output", type=str, default="tasks.yml", help="Output YAML file (default: 'tasks.yml').") |
| 142 | + parser.add_argument("--output", type=str, default="tasks.yaml", help="Output YAML file (default: 'tasks.yml').") |
101 | 143 |
|
102 | 144 | args = parser.parse_args() |
103 | 145 |
|
| 146 | + # Set instance variables based on parsed arguments |
104 | 147 | self.__directory = args.directory |
105 | 148 | self.__file_stem = args.file_stem |
106 | 149 | self.__file_suffix = args.file_suffix |
107 | 150 | self.__output_file = args.output |
108 | 151 |
|
109 | | - self.__logger = get_logger() |
110 | | - |
| 152 | + # Validate the directory |
111 | 153 | if not os.path.isdir(self.__directory): |
112 | 154 | self.__logger.error(f"Directory not found: {self.__directory}") |
113 | 155 | return |
114 | 156 |
|
| 157 | + # Process files and extract tasks |
115 | 158 | tasks = self.__process_files() |
116 | 159 |
|
| 160 | + # Write the extracted tasks to the output YAML file |
117 | 161 | try: |
118 | 162 | with open(self.__output_file, 'w') as file: |
119 | 163 | yaml.safe_dump(tasks, file) |
|
0 commit comments