|
| 1 | +"""Capacitor selection.""" |
| 2 | + |
| 3 | +# python libraries |
| 4 | +import logging |
| 5 | +import os |
| 6 | +from multiprocessing import Pool, cpu_count |
| 7 | +import copy |
| 8 | +import pickle |
| 9 | + |
| 10 | +# 3rd party libraries |
| 11 | +import numpy as np |
| 12 | +import pandas as pd |
| 13 | +import tqdm |
| 14 | + |
| 15 | +# own libraries |
| 16 | +import pecst |
| 17 | +from dct.capacitor_optimization_dtos import CapacitorOptimizationDto |
| 18 | +from dct.toml_checker import TomlCapacitorSelection, Debug |
| 19 | +from dct.datasets import HandleDabDto |
| 20 | +from dct.datasets_dtos import StudyData, FilterData |
| 21 | +from dct.server_ctl_dtos import ProgressData, ProgressStatus |
| 22 | +from dct.functions_waveforms import full_current_waveform_from_currents, full_angle_waveform_from_angles |
| 23 | +from dct.datasets_dtos import CapacitorResults |
| 24 | + |
| 25 | +logger = logging.getLogger(__name__) |
| 26 | + |
| 27 | +class CapacitorSelection: |
| 28 | + """Select suitable capacitors.""" |
| 29 | + |
| 30 | + c_df: pd.DataFrame |
| 31 | + _optimization_config_list: list[CapacitorOptimizationDto] |
| 32 | + |
| 33 | + def __init__(self): |
| 34 | + self._optimization_config_list = [] |
| 35 | + |
| 36 | + @staticmethod |
| 37 | + def verify_optimization_parameter(toml_capacitor: TomlCapacitorSelection) -> tuple[bool, str]: |
| 38 | + """ |
| 39 | + Verify the parameters from toml file for the capacitor optimization. |
| 40 | +
|
| 41 | + Dummy method so far. |
| 42 | +
|
| 43 | + :param toml_capacitor: capacitor toml file to check |
| 44 | + :type toml_capacitor: TomlCapacitorSelection |
| 45 | + :return: is_consistent, issue_report |
| 46 | + :rtype: tuple[bool, str] |
| 47 | + """ |
| 48 | + if toml_capacitor: |
| 49 | + pass |
| 50 | + return True, "" |
| 51 | + |
| 52 | + def initialize_capacitor_selection(self, toml_capacitor: TomlCapacitorSelection, study_data: StudyData, circuit_filter_data: FilterData) -> None: |
| 53 | + """ |
| 54 | + Initialize the capacitor selection. |
| 55 | +
|
| 56 | + :param toml_capacitor: capacitor data |
| 57 | + :type toml_capacitor: TomlCapacitorSelection |
| 58 | + :param study_data: capacitor study data |
| 59 | + :type study_data: StudyData |
| 60 | + :param circuit_filter_data: filtered circuit data |
| 61 | + :type circuit_filter_data: FilterData |
| 62 | + """ |
| 63 | + pecst.download_esr_csv_files() |
| 64 | + |
| 65 | + # Create the io_config_list for all trials |
| 66 | + for circuit_trial_file in circuit_filter_data.filtered_list_files: |
| 67 | + circuit_filepath = os.path.join(circuit_filter_data.filtered_list_pathname, f"{circuit_trial_file}.pkl") |
| 68 | + # Check filename |
| 69 | + if os.path.isfile(circuit_filepath): |
| 70 | + # Read results from circuit optimization |
| 71 | + circuit_dto = HandleDabDto.load_from_file(circuit_filepath) |
| 72 | + |
| 73 | + optimization_directory = os.path.join(study_data.optimization_directory, circuit_trial_file, study_data.study_name) |
| 74 | + |
| 75 | + # figure out worst case working point for the capacitor per circuit design |
| 76 | + sorted_max_rms_angles, i_c1_max_rms_current_waveform = HandleDabDto.get_max_rms_waveform_capacitor(circuit_dto, plot=False) |
| 77 | + time = sorted_max_rms_angles / (2 * np.pi * circuit_dto.input_config.fs) |
| 78 | + v_max = np.max(circuit_dto.input_config.mesh_v1) |
| 79 | + |
| 80 | + # generate capacitor requirements from circuit simulation data |
| 81 | + capacitor_requirements_dto = pecst.CapacitorRequirements( |
| 82 | + maximum_peak_to_peak_voltage_ripple=toml_capacitor.maximum_peak_to_peak_voltage_ripple, |
| 83 | + current_waveform_for_op_max_current=np.array([time, i_c1_max_rms_current_waveform]), |
| 84 | + v_dc_for_op_max_voltage=v_max, |
| 85 | + temperature_ambient=toml_capacitor.temperature_ambient, |
| 86 | + voltage_safety_margin_percentage=toml_capacitor.voltage_safety_margin_percentage, |
| 87 | + capacitor_type_list=[pecst.CapacitorType.FilmCapacitor], |
| 88 | + maximum_number_series_capacitors=toml_capacitor.maximum_number_series_capacitors, |
| 89 | + capacitor_tolerance_percent=pecst.CapacitanceTolerance.TenPercent, |
| 90 | + lifetime_h=toml_capacitor.lifetime_h, |
| 91 | + results_directory=optimization_directory |
| 92 | + ) |
| 93 | + |
| 94 | + # Initialize the statistical data |
| 95 | + stat_data_init: ProgressData = ProgressData(run_time=0, number_of_filtered_points=0, |
| 96 | + progress_status=ProgressStatus.Idle) |
| 97 | + |
| 98 | + capacitor_dto = CapacitorOptimizationDto( |
| 99 | + circuit_filtered_point_filename=circuit_trial_file, |
| 100 | + progress_data=copy.deepcopy(stat_data_init), |
| 101 | + capacitor_optimization_dto=capacitor_requirements_dto) |
| 102 | + |
| 103 | + self._optimization_config_list.append(capacitor_dto) |
| 104 | + else: |
| 105 | + logger.info(f"Wrong path or file {circuit_filepath} does not exists!") |
| 106 | + |
| 107 | + @staticmethod |
| 108 | + def _start_optimization(circuit_filtered_point_file: str, act_cst_config: pecst.CapacitorRequirements, filter_data: FilterData, |
| 109 | + debug: Debug) -> int: |
| 110 | + # capacitor requirements |
| 111 | + _, c_db_df_list = pecst.select_capacitors(act_cst_config) |
| 112 | + |
| 113 | + c_db_df = pd.concat(c_db_df_list) |
| 114 | + |
| 115 | + if not os.path.exists(act_cst_config.results_directory): |
| 116 | + os.makedirs(act_cst_config.results_directory) |
| 117 | + c_db_df.to_csv(f"{act_cst_config.results_directory}/results.csv") |
| 118 | + |
| 119 | + df_filtered = pecst.filter_df(c_db_df) |
| 120 | + if debug.general.is_debug: |
| 121 | + # reduce dataset to the given number from the debug configuration |
| 122 | + df_filtered = df_filtered.iloc[:debug.capacitor_1.number_working_point_max] |
| 123 | + # save Pareto front designs (reduced in case of active debugging) |
| 124 | + df_filtered.to_csv(f"{act_cst_config.results_directory}/results_filtered.csv") |
| 125 | + |
| 126 | + # Load configuration |
| 127 | + circuit_dto = HandleDabDto.load_from_file(os.path.join(filter_data.filtered_list_pathname, f"{circuit_filtered_point_file}.pkl")) |
| 128 | + |
| 129 | + # sweep through all current waveforms |
| 130 | + i_l1_sorted = np.transpose(circuit_dto.calc_currents.i_l_1_sorted, (1, 2, 3, 0)) |
| 131 | + angles_rad_sorted = np.transpose(circuit_dto.calc_currents.angles_rad_sorted, (1, 2, 3, 0)) |
| 132 | + |
| 133 | + all_operation_point_ordering_codes_list = df_filtered["ordering code"].to_numpy() |
| 134 | + all_operation_point_volume_list = df_filtered["volume_total"].to_numpy() |
| 135 | + all_operation_point_area_list = df_filtered["area_total"].to_numpy() |
| 136 | + all_operation_point_n_series_list = df_filtered["in_series_needed"].to_numpy() |
| 137 | + all_operation_point_n_parallel_list = df_filtered["in_parallel_needed"].to_numpy() |
| 138 | + |
| 139 | + # Overtake the filtered operation points |
| 140 | + number_of_filtered_points = len(all_operation_point_ordering_codes_list) |
| 141 | + |
| 142 | + logger.info(f"Full-operating point simulation list: {all_operation_point_ordering_codes_list}") |
| 143 | + |
| 144 | + # simulate all operating points |
| 145 | + for count, ordering_code in enumerate(tqdm.tqdm(all_operation_point_ordering_codes_list)): |
| 146 | + |
| 147 | + volume_total = all_operation_point_volume_list[count] |
| 148 | + area_total = all_operation_point_area_list[count] |
| 149 | + df_geometry_re_simulation_number = df_filtered[df_filtered["ordering code"] == ordering_code] |
| 150 | + n_series = all_operation_point_n_series_list[count] |
| 151 | + n_parallel = all_operation_point_n_parallel_list[count] |
| 152 | + |
| 153 | + logger.debug(f"ordering_code: \n" |
| 154 | + f" {df_geometry_re_simulation_number.head()}") |
| 155 | + |
| 156 | + loss_total_array = np.full_like(circuit_dto.calc_modulation.phi, np.nan) |
| 157 | + |
| 158 | + new_circuit_dto_directory = os.path.join(act_cst_config.results_directory, "01_circuit_dtos_incl_capacitor_1_loss") |
| 159 | + if not os.path.exists(new_circuit_dto_directory): |
| 160 | + os.makedirs(new_circuit_dto_directory) |
| 161 | + |
| 162 | + if os.path.exists(os.path.join(new_circuit_dto_directory, f"{ordering_code}.pkl")): |
| 163 | + logger.info(f"Re-simulation of {circuit_dto.name} already exists. Skip.") |
| 164 | + else: |
| 165 | + for vec_vvp in np.ndindex(circuit_dto.calc_modulation.phi.shape): |
| 166 | + time, unique_indices = np.unique(full_angle_waveform_from_angles( |
| 167 | + angles_rad_sorted[vec_vvp]) / 2 / np.pi / circuit_dto.input_config.fs, return_index=True) |
| 168 | + current = full_current_waveform_from_currents(i_l1_sorted[vec_vvp])[unique_indices] |
| 169 | + |
| 170 | + current_waveform = np.array([time, current]) |
| 171 | + logger.debug(f"{current_waveform=}") |
| 172 | + logger.debug("All operating point simulation of:") |
| 173 | + logger.debug(f" * Circuit study: {filter_data.circuit_study_name}") |
| 174 | + logger.debug(f" * Circuit trial: {circuit_filtered_point_file}") |
| 175 | + logger.debug(f" * Inductor re-simulation trial: {ordering_code}") |
| 176 | + |
| 177 | + [frequency_list, current_amplitude_list, _] = pecst.fft(current_waveform, plot='no', mode='time', title='fft input current') |
| 178 | + |
| 179 | + loss_per_capacitor = pecst.power_loss_film_capacitor(ordering_code, frequency_list, current_amplitude_list, |
| 180 | + number_parallel_capacitors=n_parallel) |
| 181 | + |
| 182 | + loss_total_array[vec_vvp] = loss_per_capacitor * n_series * n_parallel |
| 183 | + |
| 184 | + capacitor_losses = CapacitorResults( |
| 185 | + p_combined_losses=loss_total_array, |
| 186 | + volume=volume_total, |
| 187 | + pcb_area=area_total, |
| 188 | + circuit_trial_file=circuit_filtered_point_file, |
| 189 | + capacitor_order_number=df_geometry_re_simulation_number, |
| 190 | + ) |
| 191 | + |
| 192 | + pickle_file = os.path.join(new_circuit_dto_directory, f"{ordering_code}.pkl") |
| 193 | + with open(pickle_file, 'wb') as output: |
| 194 | + pickle.dump(capacitor_losses, output, pickle.HIGHEST_PROTOCOL) |
| 195 | + |
| 196 | + # returns the number of filtered results |
| 197 | + return number_of_filtered_points |
| 198 | + |
| 199 | + def optimization_handler(self, filter_data: FilterData, debug: Debug) -> None: |
| 200 | + """ |
| 201 | + Control the multi simulation processes. |
| 202 | +
|
| 203 | + :param filter_data: Information about the filtered designs |
| 204 | + :type filter_data: dct.FilterData |
| 205 | + :param debug: True to use debug mode which stops earlier |
| 206 | + :type debug: bool |
| 207 | + """ |
| 208 | + number_cpus = cpu_count() |
| 209 | + |
| 210 | + with Pool(processes=number_cpus) as pool: |
| 211 | + parameters = [] |
| 212 | + for count, act_optimization_configuration in enumerate(self._optimization_config_list): |
| 213 | + if debug.general.is_debug: |
| 214 | + # in debug mode, stop when number of configuration parameters has reached the same as parallel cores are used |
| 215 | + if count == number_cpus: |
| 216 | + break |
| 217 | + |
| 218 | + parameters.append(( |
| 219 | + act_optimization_configuration.circuit_filtered_point_filename, |
| 220 | + act_optimization_configuration.capacitor_optimization_dto, |
| 221 | + filter_data, |
| 222 | + debug |
| 223 | + )) |
| 224 | + |
| 225 | + pool.starmap(func=CapacitorSelection._start_optimization, iterable=parameters) |
0 commit comments