diff --git a/omc3/kmod_importer.py b/omc3/kmod_importer.py index 2414cd45..6499ab6e 100644 --- a/omc3/kmod_importer.py +++ b/omc3/kmod_importer.py @@ -188,6 +188,10 @@ def import_kmod_results(opt: DotDict) -> None: averaged_results[ip][opt.beam], # bpm results of the specific beam ) ] + + import_kmod_summary_table( + meas_paths=opt.meas_paths, averaged_meas_paths=averaged_results, output_dir=opt.output_dir) + import_kmod_data( model=df_model, measurements=results_list, @@ -301,6 +305,96 @@ def _get_betastar(df_model: tfs.TfsDataFrame, ip: str) -> list[float, float]: # return [round(bstar, 3) for bstar in df_model.loc[ip, [f"{BETA}X", f"{BETA}Y"]]] return df_model.loc[ip, [f"{BETA}X", f"{BETA}Y"]].tolist() +def import_kmod_summary_table( + meas_paths: Sequence[Path | str], + averaged_meas_paths: Sequence[tfs.TfsDataFrame], + output_dir: Path | str | None = None + ) -> None: + + """ + Write the KMOD summary table from each results.tfs file. + It writes down the following files for each beam: + {beam}_kmod_sum_X.tfs: BETSTARX, ERRBETSTARX, BETWAISTX, ERRBETWAISTX, WAISTX, ERRWAISTX. + {beam}_kmod_sum_X.tfs: BETSTARY, ERRBETSTARY, BETWAISTY, ERRBETWAISTY, WAISTY, ERRWAISTY. + {beam}_tables.txt to be quickly copy/pasted in the logbook. + """ + + cols_x = ["BETSTARX", "ERRBETSTARX", "BETWAISTX", "ERRBETWAISTX", "WAISTX", "ERRWAISTX"] + cols_y = ["BETSTARY", "ERRBETSTARY", "BETWAISTY", "ERRBETWAISTY", "WAISTY", "ERRWAISTY"] + beams = [f"{BEAM_DIR}{1}", f"{BEAM_DIR}{2}"] + + def output_table_single_beam( + data_sngl_file_raw: Path | str = None, + av_res_flag: bool = False, + ) -> tuple[tfs.TfsDataFrame, tfs.TfsDataFrame]: + + """ + It writes the single row for each kmod/results.tfs file. + """ + + if not av_res_flag: + file_name = data_sngl_file_raw.parent.parent.name + data_sngl_file = tfs.read(data_sngl_file_raw) + try: + label = data_sngl_file["LABEL"].iloc[0] + second_magnet = label.split("-")[1] + relevant = second_magnet.split(".")[-1] + ip_number = relevant[1:] + ip_name = f"IP{ip_number}" # --- it extracts the IP from the magnet label, check for typos + except KeyError as e: + LOG.debug("Could not parse IP from LABEL. Skipping entry.", exc_info=e) + return None + + df = data_sngl_file[cols_x + cols_y].iloc[[0]] + df.insert(0, "NAME", file_name) + df.insert(0, "IP", ip_name) + return df + + df_x = data_sngl_file_raw[cols_x].iloc[[0]] + df_y = data_sngl_file_raw[cols_y].iloc[[0]] + df_x.insert(0, "IP", data_sngl_file_raw['NAME']) + df_y.insert(0, "IP", data_sngl_file_raw['NAME']) + return df_x, df_y + + grouped = {beam: [] for beam in beams} + for elem in meas_paths: + LOG.debug(f"Reading results: {elem}.") + for beam in beams: + file_path = elem / beam / "results.tfs" + df = output_table_single_beam(file_path) + grouped[beam].append(df) + + all_av_x, all_av_y = [], [] + for key, value in averaged_meas_paths.items(): + LOG.debug(f"Reading averaged results: {key}") + df_av_x, df_av_y = output_table_single_beam(value[0], av_res_flag=True) + all_av_x.append(df_av_x) + all_av_y.append(df_av_y) + + averaged_tables = {"X": (tfs.concat(all_av_x, ignore_index=True)).to_string(), + "Y": (tfs.concat(all_av_y, ignore_index=True)).to_string()} + + for beam in beams: + LOG.debug(f"Processing result for: {beam}") + if grouped[beam]: + big_df = tfs.concat(grouped[beam], ignore_index=True) + big_df_x = big_df[["IP", "NAME"] + cols_x] + big_df_y = big_df[["IP", "NAME"] + cols_y] + txt_output = [] + for name, df in [("X", big_df_x), ("Y", big_df_y)]: + table_str = df.to_string() + txt_output.append(f"\n============================================ {beam} Results ({name}-plane) ====================================================\n\n{table_str}\n") + txt_output.append(f"\n=========================== {beam} Averaged Results ({name}-plane) ============================================================\n\n{averaged_tables[name]}\n") + + LOG.debug(f"Saving results for: {beam} ") + summary_path = output_dir / f"{beam}_summary_logbook.txt" + with summary_path.open("w") as f: + f.write("\n".join(txt_output)) + tfs.write(output_dir/f"{beam}_kmod_summary.tfs", big_df) + + LOG.info('KMOD summary outputs correctly saved.') + + return # Script Mode ------------------------------------------------------------------ diff --git a/tests/unit/test_kmod_importer.py b/tests/unit/test_kmod_importer.py index c65b6261..4cb884d2 100644 --- a/tests/unit/test_kmod_importer.py +++ b/tests/unit/test_kmod_importer.py @@ -1,5 +1,4 @@ from pathlib import Path - import pytest import tfs @@ -51,11 +50,12 @@ def test_full_kmod_import(tmp_path: Path, beam: int, ips: str): # OUTPUT CHECKS -------------------------------------------- # Check the basics, if anything looks weird --- - assert len(list(tmp_path.glob(f"*{EXT}"))) == 4 # beta_kmod x/y, betastar x/y + assert len(list(tmp_path.glob(f"*{EXT}"))) == 6 # beta_kmod x/y, betastar x/y B1/B2_kmod_sum + assert len(list(tmp_path.glob("*.txt"))) == 2 # B1/B2_tables.txt average_dir = tmp_path / AVERAGE_DIR assert average_dir.is_dir() - assert len(list(average_dir.glob("*.pdf"))) == 3 * len(ips) # beta, beat and waist per IP + assert len(list(average_dir.glob("*.pdf"))) == 3 * len(ips) # beta, beat and waist per IP assert len(list(average_dir.glob(f"*{EXT}"))) == 3 * len(ips) + N_EFFECTIVE_FILES[len(ips)] # AV_BPM: N_BEAM*N_IP, AV_BETASTAR: N_IPs, Effective: see map # Check the content --- @@ -92,3 +92,4 @@ def test_full_kmod_import(tmp_path: Path, beam: int, ips: str): lumi_filename = _get_lumi_filename(betas, ip_a=ips[0], ip_b=ips[1]) assert (average_dir / lumi_filename).exists() +